99import java .util .concurrent .ConcurrentSkipListMap ;
1010import java .util .concurrent .atomic .AtomicBoolean ;
1111import java .util .concurrent .atomic .AtomicLong ;
12+ import java .util .function .Consumer ;
1213
1314public final class PrioritisedTaskQueue implements PrioritisedExecutor {
1415
@@ -26,6 +27,7 @@ public final class PrioritisedTaskQueue implements PrioritisedExecutor {
2627 private final AtomicLong subOrderGenerator ;
2728 private final AtomicBoolean shutdown = new AtomicBoolean ();
2829 private final ConcurrentSkipListMap <PrioritisedQueuedTask .Holder , Boolean > tasks ;
30+ private final Consumer <PrioritisedTask > queueHook ;
2931
3032 public PrioritisedTaskQueue () {
3133 this (new AtomicLong ());
@@ -36,8 +38,13 @@ public PrioritisedTaskQueue(final AtomicLong subOrderGenerator) {
3638 }
3739
3840 public PrioritisedTaskQueue (final AtomicLong subOrderGenerator , final long flags ) {
41+ this (subOrderGenerator , flags , null );
42+ }
43+
44+ public PrioritisedTaskQueue (final AtomicLong subOrderGenerator , final long flags , final Consumer <PrioritisedTask > queueHook ) {
3945 this .subOrderGenerator = subOrderGenerator ;
4046 this .tasks = new ConcurrentSkipListMap <>(((flags & FLAG_ORDER_BY_STREAM ) != 0L ) ? PrioritisedQueuedTask .COMPARATOR_STREAM : PrioritisedQueuedTask .COMPARATOR );
47+ this .queueHook = queueHook ;
4148 }
4249
4350 @ Override
@@ -281,6 +288,10 @@ public boolean queue() {
281288 }
282289 }
283290
291+ final Consumer <PrioritisedTask > queueHook = PrioritisedTaskQueue .this .queueHook ;
292+ if (queueHook != null ) {
293+ queueHook .accept (this );
294+ }
284295
285296 return true ;
286297 }
@@ -347,6 +358,22 @@ public Priority getPriority() {
347358 }
348359 }
349360
361+ private void updateHolder (final int priority , final long subOrder , final long stream , final long id ) {
362+ final Holder oldHolder = this .holder ;
363+ if (oldHolder == null ) {
364+ return ;
365+ }
366+
367+ final Holder newHolder = new Holder (this , priority , subOrder , stream , id );
368+ this .holder = newHolder ;
369+
370+ PrioritisedTaskQueue .this .tasks .put (newHolder , Boolean .TRUE );
371+
372+ if (oldHolder .markRemoved ()) {
373+ PrioritisedTaskQueue .this .tasks .remove (oldHolder );
374+ }
375+ }
376+
350377 @ Override
351378 public boolean setPriority (final Priority priority ) {
352379 synchronized (this ) {
@@ -355,14 +382,7 @@ public boolean setPriority(final Priority priority) {
355382 }
356383
357384 this .priority = priority ;
358-
359- if (this .holder != null ) {
360- if (this .holder .markRemoved ()) {
361- PrioritisedTaskQueue .this .tasks .remove (this .holder );
362- }
363- this .holder = new Holder (this , priority .priority , this .subOrder , this .stream , this .id );
364- PrioritisedTaskQueue .this .tasks .put (this .holder , Boolean .TRUE );
365- }
385+ this .updateHolder (priority .priority , this .subOrder , this .stream , this .id );
366386
367387 return true ;
368388 }
@@ -376,14 +396,7 @@ public boolean raisePriority(final Priority priority) {
376396 }
377397
378398 this .priority = priority ;
379-
380- if (this .holder != null ) {
381- if (this .holder .markRemoved ()) {
382- PrioritisedTaskQueue .this .tasks .remove (this .holder );
383- }
384- this .holder = new Holder (this , priority .priority , this .subOrder , this .stream , this .id );
385- PrioritisedTaskQueue .this .tasks .put (this .holder , Boolean .TRUE );
386- }
399+ this .updateHolder (priority .priority , this .subOrder , this .stream , this .id );
387400
388401 return true ;
389402 }
@@ -397,14 +410,7 @@ public boolean lowerPriority(Priority priority) {
397410 }
398411
399412 this .priority = priority ;
400-
401- if (this .holder != null ) {
402- if (this .holder .markRemoved ()) {
403- PrioritisedTaskQueue .this .tasks .remove (this .holder );
404- }
405- this .holder = new Holder (this , priority .priority , this .subOrder , this .stream , this .id );
406- PrioritisedTaskQueue .this .tasks .put (this .holder , Boolean .TRUE );
407- }
413+ this .updateHolder (priority .priority , this .subOrder , this .stream , this .id );
408414
409415 return true ;
410416 }
@@ -425,14 +431,7 @@ public boolean setSubOrder(final long subOrder) {
425431 }
426432
427433 this .subOrder = subOrder ;
428-
429- if (this .holder != null ) {
430- if (this .holder .markRemoved ()) {
431- PrioritisedTaskQueue .this .tasks .remove (this .holder );
432- }
433- this .holder = new Holder (this , priority .priority , this .subOrder , this .stream , this .id );
434- PrioritisedTaskQueue .this .tasks .put (this .holder , Boolean .TRUE );
435- }
434+ this .updateHolder (this .priority .priority , subOrder , this .stream , this .id );
436435
437436 return true ;
438437 }
@@ -446,14 +445,7 @@ public boolean raiseSubOrder(long subOrder) {
446445 }
447446
448447 this .subOrder = subOrder ;
449-
450- if (this .holder != null ) {
451- if (this .holder .markRemoved ()) {
452- PrioritisedTaskQueue .this .tasks .remove (this .holder );
453- }
454- this .holder = new Holder (this , priority .priority , this .subOrder , this .stream , this .id );
455- PrioritisedTaskQueue .this .tasks .put (this .holder , Boolean .TRUE );
456- }
448+ this .updateHolder (this .priority .priority , subOrder , this .stream , this .id );
457449
458450 return true ;
459451 }
@@ -467,14 +459,7 @@ public boolean lowerSubOrder(final long subOrder) {
467459 }
468460
469461 this .subOrder = subOrder ;
470-
471- if (this .holder != null ) {
472- if (this .holder .markRemoved ()) {
473- PrioritisedTaskQueue .this .tasks .remove (this .holder );
474- }
475- this .holder = new Holder (this , priority .priority , this .subOrder , this .stream , this .id );
476- PrioritisedTaskQueue .this .tasks .put (this .holder , Boolean .TRUE );
477- }
462+ this .updateHolder (this .priority .priority , subOrder , this .stream , this .id );
478463
479464 return true ;
480465 }
@@ -495,14 +480,7 @@ public boolean setStream(final long stream) {
495480 }
496481
497482 this .stream = stream ;
498-
499- if (this .holder != null ) {
500- if (this .holder .markRemoved ()) {
501- PrioritisedTaskQueue .this .tasks .remove (this .holder );
502- }
503- this .holder = new Holder (this , priority .priority , this .subOrder , this .stream , this .id );
504- PrioritisedTaskQueue .this .tasks .put (this .holder , Boolean .TRUE );
505- }
483+ this .updateHolder (this .priority .priority , this .subOrder , stream , this .id );
506484
507485 return true ;
508486 }
@@ -519,14 +497,7 @@ public boolean setPrioritySubOrderStream(final Priority priority, final long sub
519497 this .priority = priority ;
520498 this .subOrder = subOrder ;
521499 this .stream = stream ;
522-
523- if (this .holder != null ) {
524- if (this .holder .markRemoved ()) {
525- PrioritisedTaskQueue .this .tasks .remove (this .holder );
526- }
527- this .holder = new Holder (this , priority .priority , this .subOrder , this .stream , this .id );
528- PrioritisedTaskQueue .this .tasks .put (this .holder , Boolean .TRUE );
529- }
500+ this .updateHolder (priority .priority , subOrder , stream , this .id );
530501
531502 return true ;
532503 }
0 commit comments