Skip to content

Commit 124c546

Browse files
committed
Update to 0.0.4-SNAPSHOT
- Add AreaDependentQueue AreaDependentQueue is capable of dynamically ordering tasks scheduled (possibly) onto overlapping x/y coordinates. For a given coordinate, only one task at most is being executed. - Add stream id (long) to PrioritisedExecutor This will be later used by thread pools that may perform different scheduling depending on the stream id. For example, it may be possible that tasks with a lower stream id are executed before other tasks, or that tasks with different stream ids are shared equally. At this stage, I've not yet decided. - Add BalancedPrioritisedThreadPool This thread pool will share time equally between groups but will not share time equally between queues within the groups. The task execution order is determined by priority then by suborder. This results is more fair task execution as determined by queue size (i.e more tasks in one queue should generally receive more time). - Rewrite spinwait logic in PrioritisedQueueExecutorThread 1. Do not yield and park immediately upon emptying the queue Instead, set the parked field. This allows wakeup to occur if a task is queued while short parking. 2. Only spinwait if configured to - Fix PrioritisedQueueExecutorThread#close(wait=true, ...) not waiting Need to only break when the thread dies. Forgot to invert the alive check. - Add COWArrayList The underlying array is not Object[] but is actually of the element type. The list also works only through reference equality as well. The array is retrievable for faster iteration. - Change coordinates from x/z to x/y coordinates in ReentrantAreaLock - Use AtomicLong for size is ConcurrentLong2ReferenceChainedHashTable We really don't see any benefit from using LongAdder since on additions we need to sum over the entire adder (defeating the purpose of splitting the additions for parallelism). There should be a minor improvement in put/remove performance, even in contended scenarios. - Remove ConcurrentUtil#rethrow Use ThrowUtil#throwUnchecked - Remove PrioritisedThreadPool Use BalancedPrioritisedThreadPool, or in the future even StreamOrderedThreadPool
1 parent f150999 commit 124c546

22 files changed

+2793
-946
lines changed

gradle/wrapper/gradle-wrapper.jar

181 Bytes
Binary file not shown.

gradle/wrapper/gradle-wrapper.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
distributionBase=GRADLE_USER_HOME
22
distributionPath=wrapper/dists
3-
distributionUrl=https\://services.gradle.org/distributions/gradle-8.11.1-bin.zip
3+
distributionUrl=https\://services.gradle.org/distributions/gradle-8.14.3-bin.zip
44
networkTimeout=10000
55
validateDistributionUrl=true
66
zipStoreBase=GRADLE_USER_HOME

gradlew

Lines changed: 4 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gradlew.bat

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/main/java/ca/spottedleaf/concurrentutil/collection/MultiThreadedQueue.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package ca.spottedleaf.concurrentutil.collection;
22

33
import ca.spottedleaf.concurrentutil.util.ConcurrentUtil;
4+
import ca.spottedleaf.concurrentutil.util.ThrowUtil;
45
import ca.spottedleaf.concurrentutil.util.Validate;
5-
66
import java.lang.invoke.VarHandle;
77
import java.util.ArrayList;
88
import java.util.Collection;
@@ -1134,7 +1134,7 @@ protected final E removeHead() {
11341134
* @return The total number of elements drained.
11351135
*/
11361136
public int drain(final Consumer<E> consumer) {
1137-
return this.drain(consumer, false, ConcurrentUtil::rethrow);
1137+
return this.drain(consumer, false, ThrowUtil::throwUnchecked);
11381138
}
11391139

11401140
/**
@@ -1154,7 +1154,7 @@ public int drain(final Consumer<E> consumer) {
11541154
* @return The total number of elements drained.
11551155
*/
11561156
public int drain(final Consumer<E> consumer, final boolean preventAdds) {
1157-
return this.drain(consumer, preventAdds, ConcurrentUtil::rethrow);
1157+
return this.drain(consumer, preventAdds, ThrowUtil::throwUnchecked);
11581158
}
11591159

11601160
/**

src/main/java/ca/spottedleaf/concurrentutil/executor/PrioritisedExecutor.java

Lines changed: 52 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -92,14 +92,16 @@ public interface PrioritisedExecutor {
9292
* @param task The task to run.
9393
* @param priority The priority for the task.
9494
* @param subOrder The task's suborder.
95+
* @param stream The task's stream id.
9596
*
9697
* @throws IllegalStateException If this executor has shutdown.
9798
* @throws NullPointerException If the task is null
9899
* @throws IllegalArgumentException If the priority is invalid.
99100
* @return {@code null} if the current thread immediately executed the task, else returns the prioritised task
100101
* associated with the parameter
101102
*/
102-
public PrioritisedTask queueTask(final Runnable task, final Priority priority, final long subOrder);
103+
public PrioritisedTask queueTask(final Runnable task, final Priority priority, final long subOrder,
104+
final long stream);
103105

104106
/**
105107
* Creates, but does not queue or execute, a task at {@link Priority#NORMAL} priority.
@@ -130,13 +132,15 @@ public interface PrioritisedExecutor {
130132
* @param task The task to run.
131133
* @param priority The priority for the task.
132134
* @param subOrder The task's suborder.
135+
* @param stream The task's stream.
133136
*
134137
* @throws NullPointerException If the task is null
135138
* @throws IllegalArgumentException If the priority is invalid.
136139
* @return {@code null} if the current thread immediately executed the task, else returns the prioritised task
137140
* associated with the parameter
138141
*/
139-
public PrioritisedTask createTask(final Runnable task, final Priority priority, final long subOrder);
142+
public PrioritisedTask createTask(final Runnable task, final Priority priority, final long subOrder,
143+
final long stream);
140144

141145
public static interface PrioritisedTask extends Cancellable {
142146

@@ -220,7 +224,7 @@ public static interface PrioritisedTask extends Cancellable {
220224
public boolean lowerPriority(final Priority priority);
221225

222226
/**
223-
* Returns the suborder id associated with this task.
227+
* Returns the suborder id associated with this task, or 0 if completing.
224228
* @return The suborder id associated with this task.
225229
*/
226230
public long getSubOrder();
@@ -232,7 +236,7 @@ public static interface PrioritisedTask extends Cancellable {
232236
* @param subOrder Specified new sub order.
233237
*
234238
* @return {@code true} if successful, {@code false} if this task is completing or has completed or the queue
235-
* this task was scheduled on was shutdown, or if the current suborder is the same as the new sub order.
239+
* this task was scheduled on was shutdown, or if the current suborder is the same as the new suborder.
236240
*/
237241
public boolean setSubOrder(final long subOrder);
238242

@@ -257,15 +261,54 @@ public static interface PrioritisedTask extends Cancellable {
257261
public boolean lowerSubOrder(final long subOrder);
258262

259263
/**
260-
* Sets the priority and suborder id associated with this task. Ths function has no effect when this task
264+
* Returns the stream id associated with this task, or 0 if completing.
265+
* @return The stream id associated with this task.
266+
*/
267+
public long getStream();
268+
269+
/**
270+
* Sets the stream id associated with this task. Ths function has no effect when this task
261271
* is completing or is completed.
262272
*
263-
* @param priority Priority specified
273+
* @param stream Specified new stream.
274+
*
275+
* @return {@code true} if successful, {@code false} if this task is completing or has completed or the queue
276+
* this task was scheduled on was shutdown, or if the current stream is the same as the new stream.
277+
*/
278+
public boolean setStream(final long stream);
279+
280+
/**
281+
* Sets the priority, suborder id, and stream id associated with this task. Ths function has no effect when
282+
* this task is completing or is completed.
283+
*
284+
* @param priority Specified new priority.
264285
* @param subOrder Specified new sub order.
286+
* @param stream Specified new stream.
265287
* @return {@code true} if successful, {@code false} if this task is completing or has completed or the queue
266-
* this task was scheduled on was shutdown, or if the current priority and suborder are the same as
267-
* the parameters.
288+
* this task was scheduled on was shutdown, or if the current priority. suborder, and stream are the same
289+
* as the parameters.
268290
*/
269-
public boolean setPriorityAndSubOrder(final Priority priority, final long subOrder);
291+
public boolean setPrioritySubOrderStream(final Priority priority, final long subOrder,
292+
final long stream);
293+
294+
/**
295+
* Atomically retrieves the priority, suborder, and stream for this task. Returns {@code null} if the task
296+
* is completing or cancelled.
297+
* @return The current priority state, or {@code null} if completing or cancelled.
298+
*/
299+
public PriorityState getPriorityState();
300+
}
301+
302+
public static record PriorityState(Priority priority, long subOrder, long stream) implements Comparable<PriorityState> {
303+
304+
@Override
305+
public int compareTo(final PriorityState other) {
306+
final int priorityCompare = this.priority.priority - other.priority.priority;
307+
if (priorityCompare != 0) {
308+
return priorityCompare;
309+
}
310+
311+
return Long.compare(this.subOrder, other.subOrder);
312+
}
270313
}
271314
}

0 commit comments

Comments
 (0)