1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.concurrency.ScheduledThreadPoolExecutor; 13 14 import hunt.concurrency.atomic.AtomicHelper; 15 import hunt.concurrency.BlockingQueue; 16 import hunt.concurrency.Delayed; 17 import hunt.concurrency.Future; 18 import hunt.concurrency.FutureTask; 19 import hunt.concurrency.ScheduledExecutorService; 20 import hunt.concurrency.thread; 21 import hunt.concurrency.ThreadFactory; 22 import hunt.concurrency.ThreadPoolExecutor; 23 24 import hunt.collection; 25 import hunt.Exceptions; 26 import hunt.util.Common; 27 import hunt.util.DateTime; 28 import hunt.util.Runnable; 29 import hunt.Object; 30 import hunt.logging; 31 // import core.time; 32 33 import core.atomic; 34 import core.sync.condition; 35 import core.sync.mutex; 36 37 import std.datetime; 38 // import hunt.collection.AbstractQueue; 39 // import java.util.Arrays; 40 // import hunt.collection.Collection; 41 // import hunt.collection.Iterator; 42 // import java.util.List; 43 // import java.util.NoSuchElementException; 44 // import java.util.Objects; 45 // import hunt.concurrency.atomic.AtomicLong; 46 // import hunt.concurrency.locks.Condition; 47 // import hunt.concurrency.locks.ReentrantLock; 48 49 alias ReentrantLock = Mutex; 50 51 interface IScheduledFutureTask { 52 void heapIndex(int index); 53 int heapIndex(); 54 } 55 56 /** 57 * A {@link ThreadPoolExecutor} that can additionally schedule 58 * commands to run after a given delay, or to execute periodically. 59 * This class is preferable to {@link java.util.Timer} when multiple 60 * worker threads are needed, or when the additional flexibility or 61 * capabilities of {@link ThreadPoolExecutor} (which this class 62 * extends) are required. 63 * 64 * <p>Delayed tasks execute no sooner than they are enabled, but 65 * without any real-time guarantees about when, after they are 66 * enabled, they will commence. Tasks scheduled for exactly the same 67 * execution time are enabled in first-in-first-out (FIFO) order of 68 * submission. 69 * 70 * <p>When a submitted task is cancelled before it is run, execution 71 * is suppressed. By default, such a cancelled task is not 72 * automatically removed from the work queue until its delay elapses. 73 * While this enables further inspection and monitoring, it may also 74 * cause unbounded retention of cancelled tasks. To avoid this, use 75 * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately 76 * removed from the work queue at time of cancellation. 77 * 78 * <p>Successive executions of a periodic task scheduled via 79 * {@link #scheduleAtFixedRate scheduleAtFixedRate} or 80 * {@link #scheduleWithFixedDelay scheduleWithFixedDelay} 81 * do not overlap. While different executions may be performed by 82 * different threads, the effects of prior executions 83 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> 84 * those of subsequent ones. 85 * 86 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few 87 * of the inherited tuning methods are not useful for it. In 88 * particular, because it acts as a fixed-sized pool using 89 * {@code corePoolSize} threads and an unbounded queue, adjustments 90 * to {@code maximumPoolSize} have no useful effect. Additionally, it 91 * is almost never a good idea to set {@code corePoolSize} to zero or 92 * use {@code allowCoreThreadTimeOut} because this may leave the pool 93 * without threads to handle tasks once they become eligible to run. 94 * 95 * <p>As with {@code ThreadPoolExecutor}, if not otherwise specified, 96 * this class uses {@link Executors#defaultThreadFactory} as the 97 * default thread factory, and {@link ThreadPoolExecutor.AbortPolicy} 98 * as the default rejected execution handler. 99 * 100 * <p><b>Extension notes:</b> This class overrides the 101 * {@link ThreadPoolExecutor#execute(Runnable) execute} and 102 * {@link AbstractExecutorService#submit(Runnable) submit} 103 * methods to generate internal {@link ScheduledFuture} objects to 104 * control per-task delays and scheduling. To preserve 105 * functionality, any further overrides of these methods in 106 * subclasses must invoke superclass versions, which effectively 107 * disables additional task customization. However, this class 108 * provides alternative protected extension method 109 * {@code decorateTask} (one version each for {@code Runnable} and 110 * {@code Callable}) that can be used to customize the concrete task 111 * types used to execute commands entered via {@code execute}, 112 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate}, 113 * and {@code scheduleWithFixedDelay}. By default, a 114 * {@code ScheduledThreadPoolExecutor} uses a task type extending 115 * {@link FutureTask}. However, this may be modified or replaced using 116 * subclasses of the form: 117 * 118 * <pre> {@code 119 * class CustomScheduledExecutor extends ScheduledThreadPoolExecutor { 120 * 121 * static class CustomTask!(V) : RunnableScheduledFuture!(V) { ... } 122 * 123 * protected !(V) RunnableScheduledFuture!(V) decorateTask( 124 * Runnable r, RunnableScheduledFuture!(V) task) { 125 * return new CustomTask!(V)(r, task); 126 * } 127 * 128 * protected !(V) RunnableScheduledFuture!(V) decorateTask( 129 * Callable!(V) c, RunnableScheduledFuture!(V) task) { 130 * return new CustomTask!(V)(c, task); 131 * } 132 * // ... add constructors, etc. 133 * }}</pre> 134 * 135 * @author Doug Lea 136 */ 137 class ScheduledThreadPoolExecutor : ThreadPoolExecutor, ScheduledExecutorService { 138 139 /* 140 * This class specializes ThreadPoolExecutor implementation by 141 * 142 * 1. Using a custom task type ScheduledFutureTask, even for tasks 143 * that don't require scheduling because they are submitted 144 * using ExecutorService rather than ScheduledExecutorService 145 * methods, which are treated as tasks with a delay of zero. 146 * 147 * 2. Using a custom queue (DelayedWorkQueue), a variant of 148 * unbounded DelayQueue. The lack of capacity constraint and 149 * the fact that corePoolSize and maximumPoolSize are 150 * effectively identical simplifies some execution mechanics 151 * (see delayedExecute) compared to ThreadPoolExecutor. 152 * 153 * 3. Supporting optional run-after-shutdown parameters, which 154 * leads to overrides of shutdown methods to remove and cancel 155 * tasks that should NOT be run after shutdown, as well as 156 * different recheck logic when task (re)submission overlaps 157 * with a shutdown. 158 * 159 * 4. Task decoration methods to allow interception and 160 * instrumentation, which are needed because subclasses cannot 161 * otherwise override submit methods to get this effect. These 162 * don't have any impact on pool control logic though. 163 */ 164 165 /** 166 * False if should cancel/suppress periodic tasks on shutdown. 167 */ 168 private bool continueExistingPeriodicTasksAfterShutdown; 169 170 /** 171 * False if should cancel non-periodic not-yet-expired tasks on shutdown. 172 */ 173 private bool executeExistingDelayedTasksAfterShutdown = true; 174 175 /** 176 * True if ScheduledFutureTask.cancel should remove from queue. 177 */ 178 bool removeOnCancel; 179 180 /** 181 * Sequence number to break scheduling ties, and in turn to 182 * guarantee FIFO order among tied entries. 183 */ 184 private shared static long sequencer; //= new AtomicLong(); 185 186 /** 187 * Returns true if can run a task given current run state and 188 * run-after-shutdown parameters. 189 */ 190 bool canRunInCurrentRunState(V)(RunnableScheduledFuture!V task) { 191 if (!isShutdown()) 192 return true; 193 if (isStopped()) 194 return false; 195 return task.isPeriodic() 196 ? continueExistingPeriodicTasksAfterShutdown 197 : (executeExistingDelayedTasksAfterShutdown 198 || task.getDelay() <= Duration.zero); 199 } 200 201 /** 202 * Main execution method for delayed or periodic tasks. If pool 203 * is shut down, rejects the task. Otherwise adds task to queue 204 * and starts a thread, if necessary, to run it. (We cannot 205 * prestart the thread to run the task because the task (probably) 206 * shouldn't be run yet.) If the pool is shut down while the task 207 * is being added, cancel and remove it if required by state and 208 * run-after-shutdown parameters. 209 * 210 * @param task the task 211 */ 212 private void delayedExecute(V)(RunnableScheduledFuture!V task) { 213 if (isShutdown()) 214 reject(task); 215 else { 216 super.getQueue().add(task); 217 if (!canRunInCurrentRunState(task) && remove(task)) 218 task.cancel(false); 219 else 220 ensurePrestart(); 221 } 222 } 223 224 /** 225 * Requeues a periodic task unless current run state precludes it. 226 * Same idea as delayedExecute except drops task rather than rejecting. 227 * 228 * @param task the task 229 */ 230 void reExecutePeriodic(V)(RunnableScheduledFuture!V task) { 231 if (canRunInCurrentRunState(task)) { 232 super.getQueue().add(task); 233 if (canRunInCurrentRunState(task) || !remove(task)) { 234 ensurePrestart(); 235 return; 236 } 237 } 238 task.cancel(false); 239 } 240 241 /** 242 * Cancels and clears the queue of all tasks that should not be run 243 * due to shutdown policy. Invoked within super.shutdown. 244 */ 245 override void onShutdown() { 246 BlockingQueue!(Runnable) q = super.getQueue(); 247 bool keepDelayed = 248 getExecuteExistingDelayedTasksAfterShutdownPolicy(); 249 bool keepPeriodic = 250 getContinueExistingPeriodicTasksAfterShutdownPolicy(); 251 // Traverse snapshot to avoid iterator exceptions 252 // TODO: implement and use efficient removeIf 253 // super.getQueue().removeIf(...); 254 version(HUNT_DEBUG) tracef("Shuting down..., BlockingQueue size: %d", q.size()); 255 foreach (Runnable e ; q.toArray()) { 256 if(e is null) { 257 warning("e is null"); 258 } else { 259 version(HUNT_DEBUG) trace(typeid(cast(Object)e)); 260 IRunnableScheduledFuture t = cast(IRunnableScheduledFuture)e; 261 if (t !is null) { 262 if ((t.isPeriodic() 263 ? !keepPeriodic 264 : (!keepDelayed && t.getDelay() > Duration.zero)) 265 || t.isCancelled()) { // also remove if already cancelled 266 if (q.remove(t)) 267 t.cancel(false); 268 } 269 } else { 270 warning("t is null"); 271 } 272 } 273 274 } 275 tryTerminate(); 276 } 277 278 /** 279 * Modifies or replaces the task used to execute a runnable. 280 * This method can be used to override the concrete 281 * class used for managing internal tasks. 282 * The default implementation simply returns the given task. 283 * 284 * @param runnable the submitted Runnable 285 * @param task the task created to execute the runnable 286 * @param (V) the type of the task's result 287 * @return a task that can execute the runnable 288 */ 289 protected RunnableScheduledFuture!(V) decorateTask(V) ( 290 Runnable runnable, RunnableScheduledFuture!(V) task) { 291 return task; 292 } 293 294 /** 295 * Modifies or replaces the task used to execute a callable. 296 * This method can be used to override the concrete 297 * class used for managing internal tasks. 298 * The default implementation simply returns the given task. 299 * 300 * @param callable the submitted Callable 301 * @param task the task created to execute the callable 302 * @param (V) the type of the task's result 303 * @return a task that can execute the callable 304 */ 305 protected RunnableScheduledFuture!(V) decorateTask(V)( 306 Callable!(V) callable, RunnableScheduledFuture!(V) task) { 307 return task; 308 } 309 310 /** 311 * The default keep-alive time for pool threads. 312 * 313 * Normally, this value is unused because all pool threads will be 314 * core threads, but if a user creates a pool with a corePoolSize 315 * of zero (against our advice), we keep a thread alive as long as 316 * there are queued tasks. If the keep alive time is zero (the 317 * historic value), we end up hot-spinning in getTask, wasting a 318 * CPU. But on the other hand, if we set the value too high, and 319 * users create a one-shot pool which they don't cleanly shutdown, 320 * the pool's non-daemon threads will prevent JVM termination. A 321 * small but non-zero value (relative to a JVM's lifetime) seems 322 * best. 323 */ 324 private enum long DEFAULT_KEEPALIVE_MILLIS = 10L; 325 326 /** 327 * Creates a new {@code ScheduledThreadPoolExecutor} with the 328 * given core pool size. 329 * 330 * @param corePoolSize the number of threads to keep in the pool, even 331 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 332 * @throws IllegalArgumentException if {@code corePoolSize < 0} 333 */ 334 this(int corePoolSize) { 335 super(corePoolSize, int.max, dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE_MILLIS), 336 new DelayedWorkQueue()); 337 } 338 339 /** 340 * Creates a new {@code ScheduledThreadPoolExecutor} with the 341 * given initial parameters. 342 * 343 * @param corePoolSize the number of threads to keep in the pool, even 344 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 345 * @param threadFactory the factory to use when the executor 346 * creates a new thread 347 * @throws IllegalArgumentException if {@code corePoolSize < 0} 348 * @throws NullPointerException if {@code threadFactory} is null 349 */ 350 this(int corePoolSize, ThreadFactory threadFactory) { 351 super(corePoolSize, int.max, 352 dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE_MILLIS), 353 new DelayedWorkQueue(), threadFactory); 354 } 355 356 /** 357 * Creates a new {@code ScheduledThreadPoolExecutor} with the 358 * given initial parameters. 359 * 360 * @param corePoolSize the number of threads to keep in the pool, even 361 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 362 * @param handler the handler to use when execution is blocked 363 * because the thread bounds and queue capacities are reached 364 * @throws IllegalArgumentException if {@code corePoolSize < 0} 365 * @throws NullPointerException if {@code handler} is null 366 */ 367 this(int corePoolSize, RejectedExecutionHandler handler) { 368 super(corePoolSize, int.max, 369 dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE_MILLIS), 370 new DelayedWorkQueue(), handler); 371 } 372 373 /** 374 * Creates a new {@code ScheduledThreadPoolExecutor} with the 375 * given initial parameters. 376 * 377 * @param corePoolSize the number of threads to keep in the pool, even 378 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 379 * @param threadFactory the factory to use when the executor 380 * creates a new thread 381 * @param handler the handler to use when execution is blocked 382 * because the thread bounds and queue capacities are reached 383 * @throws IllegalArgumentException if {@code corePoolSize < 0} 384 * @throws NullPointerException if {@code threadFactory} or 385 * {@code handler} is null 386 */ 387 this(int corePoolSize, ThreadFactory threadFactory, 388 RejectedExecutionHandler handler) { 389 super(corePoolSize, int.max, 390 dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE_MILLIS), 391 new DelayedWorkQueue(), threadFactory, handler); 392 } 393 394 /** 395 * Returns the nanoTime-based trigger time of a delayed action. 396 */ 397 private long triggerTime(Duration delay) { 398 return triggerTime(delay.isNegative ? 0 : delay.total!(TimeUnit.HectoNanosecond)()); 399 } 400 401 /** 402 * Returns the nanoTime-based trigger time of a delayed action. 403 */ 404 long triggerTime(long delay) { 405 return Clock.currStdTime + 406 ((delay < (long.max >> 1)) ? delay : overflowFree(delay)); 407 } 408 409 /** 410 * Constrains the values of all delays in the queue to be within 411 * long.max of each other, to avoid overflow in compareTo. 412 * This may occur if a task is eligible to be dequeued, but has 413 * not yet been, while some other task is added with a delay of 414 * long.max. 415 */ 416 private long overflowFree(long delay) { 417 Delayed head = cast(Delayed) super.getQueue().peek(); 418 if (head !is null) { 419 long headDelay = head.getDelay().total!(TimeUnit.HectoNanosecond)(); 420 if (headDelay < 0 && (delay - headDelay < 0)) 421 delay = long.max + headDelay; 422 } 423 return delay; 424 } 425 426 /** 427 * @throws RejectedExecutionException {@inheritDoc} 428 * @throws NullPointerException {@inheritDoc} 429 */ 430 ScheduledFuture!(void) schedule(Runnable command, Duration delay) { 431 if (command is null) 432 throw new NullPointerException(); 433 long n = atomicOp!"+="(sequencer, 1); 434 n--; 435 // RunnableScheduledFuture!(void) t = decorateTask(command, 436 // new ScheduledFutureTask!(void)(command, cast(void)null, triggerTime(delay), n, this)); 437 RunnableScheduledFuture!(void) t = decorateTask(command, 438 new ScheduledFutureTask!(void)(command, triggerTime(delay), n, this)); 439 delayedExecute!(void)(t); 440 return t; 441 } 442 443 /** 444 * @throws RejectedExecutionException {@inheritDoc} 445 * @throws NullPointerException {@inheritDoc} 446 */ 447 ScheduledFuture!(V) schedule(V)(Callable!(V) callable, Duration delay) { 448 if (callable is null) 449 throw new NullPointerException(); 450 RunnableScheduledFuture!(V) t = decorateTask(callable, 451 new ScheduledFutureTask!(V)(callable, 452 triggerTime(delay), 453 cast(long)AtomicHelper.getAndIncrement(sequencer), this)); 454 delayedExecute(t); 455 return t; 456 } 457 458 /** 459 * Submits a periodic action that becomes enabled first after the 460 * given initial delay, and subsequently with the given period; 461 * that is, executions will commence after 462 * {@code initialDelay}, then {@code initialDelay + period}, then 463 * {@code initialDelay + 2 * period}, and so on. 464 * 465 * <p>The sequence of task executions continues indefinitely until 466 * one of the following exceptional completions occur: 467 * <ul> 468 * <li>The task is {@linkplain Future#cancel explicitly cancelled} 469 * via the returned future. 470 * <li>Method {@link #shutdown} is called and the {@linkplain 471 * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on 472 * whether to continue after shutdown} is not set true, or method 473 * {@link #shutdownNow} is called; also resulting in task 474 * cancellation. 475 * <li>An execution of the task throws an exception. In this case 476 * calling {@link Future#get() get} on the returned future will throw 477 * {@link ExecutionException}, holding the exception as its cause. 478 * </ul> 479 * Subsequent executions are suppressed. Subsequent calls to 480 * {@link Future#isDone isDone()} on the returned future will 481 * return {@code true}. 482 * 483 * <p>If any execution of this task takes longer than its period, then 484 * subsequent executions may start late, but will not concurrently 485 * execute. 486 * 487 * @throws RejectedExecutionException {@inheritDoc} 488 * @throws NullPointerException {@inheritDoc} 489 * @throws IllegalArgumentException {@inheritDoc} 490 */ 491 ScheduledFuture!void scheduleAtFixedRate(Runnable command, 492 Duration initialDelay, 493 Duration period) { 494 if (command is null) 495 throw new NullPointerException(); 496 if (period <= Duration.zero) 497 throw new IllegalArgumentException(); 498 499 ScheduledFutureTask!(void) sft = 500 new ScheduledFutureTask!(void)(command, // cast(void)null, 501 triggerTime(initialDelay), 502 period.total!(TimeUnit.HectoNanosecond)(), 503 cast(long)AtomicHelper.getAndIncrement(sequencer), this); 504 RunnableScheduledFuture!(void) t = decorateTask(command, sft); 505 sft.outerTask = t; 506 delayedExecute(t); 507 return t; 508 } 509 510 /** 511 * Submits a periodic action that becomes enabled first after the 512 * given initial delay, and subsequently with the given delay 513 * between the termination of one execution and the commencement of 514 * the next. 515 * 516 * <p>The sequence of task executions continues indefinitely until 517 * one of the following exceptional completions occur: 518 * <ul> 519 * <li>The task is {@linkplain Future#cancel explicitly cancelled} 520 * via the returned future. 521 * <li>Method {@link #shutdown} is called and the {@linkplain 522 * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on 523 * whether to continue after shutdown} is not set true, or method 524 * {@link #shutdownNow} is called; also resulting in task 525 * cancellation. 526 * <li>An execution of the task throws an exception. In this case 527 * calling {@link Future#get() get} on the returned future will throw 528 * {@link ExecutionException}, holding the exception as its cause. 529 * </ul> 530 * Subsequent executions are suppressed. Subsequent calls to 531 * {@link Future#isDone isDone()} on the returned future will 532 * return {@code true}. 533 * 534 * @throws RejectedExecutionException {@inheritDoc} 535 * @throws NullPointerException {@inheritDoc} 536 * @throws IllegalArgumentException {@inheritDoc} 537 */ 538 ScheduledFuture!(void) scheduleWithFixedDelay(Runnable command, 539 Duration initialDelay, 540 Duration delay) { 541 if (command is null) 542 throw new NullPointerException(); 543 if (delay <= Duration.zero) 544 throw new IllegalArgumentException(); 545 ScheduledFutureTask!(void) sft = 546 new ScheduledFutureTask!(void)(command, // cast(void)null, 547 triggerTime(initialDelay), 548 -delay.total!(TimeUnit.HectoNanosecond)(), 549 cast(long)AtomicHelper.getAndIncrement(sequencer), this); 550 RunnableScheduledFuture!(void) t = decorateTask(command, sft); 551 sft.outerTask = t; 552 delayedExecute(t); 553 return t; 554 } 555 556 /** 557 * Executes {@code command} with zero required delay. 558 * This has effect equivalent to 559 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}. 560 * Note that inspections of the queue and of the list returned by 561 * {@code shutdownNow} will access the zero-delayed 562 * {@link ScheduledFuture}, not the {@code command} itself. 563 * 564 * <p>A consequence of the use of {@code ScheduledFuture} objects is 565 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always 566 * called with a null second {@code Throwable} argument, even if the 567 * {@code command} terminated abruptly. Instead, the {@code Throwable} 568 * thrown by such a task can be obtained via {@link Future#get}. 569 * 570 * @throws RejectedExecutionException at discretion of 571 * {@code RejectedExecutionHandler}, if the task 572 * cannot be accepted for execution because the 573 * executor has been shut down 574 * @throws NullPointerException {@inheritDoc} 575 */ 576 override void execute(Runnable command) { 577 schedule(command, Duration.zero); 578 } 579 580 // Override AbstractExecutorService methods 581 582 /** 583 * @throws RejectedExecutionException {@inheritDoc} 584 * @throws NullPointerException {@inheritDoc} 585 */ 586 override Future!void submit(Runnable task) { 587 return schedule(task, Duration.zero); 588 } 589 590 /** 591 * @throws RejectedExecutionException {@inheritDoc} 592 * @throws NullPointerException {@inheritDoc} 593 */ 594 Future!(T) submit(T)(Runnable task, T result) { 595 return schedule(Executors.callable(task, result), Duration.zero); 596 } 597 598 /** 599 * @throws RejectedExecutionException {@inheritDoc} 600 * @throws NullPointerException {@inheritDoc} 601 */ 602 Future!(T) submit(T)(Callable!(T) task) { 603 return schedule(task, Duration.zero); 604 } 605 606 /** 607 * Sets the policy on whether to continue executing existing 608 * periodic tasks even when this executor has been {@code shutdown}. 609 * In this case, executions will continue until {@code shutdownNow} 610 * or the policy is set to {@code false} when already shutdown. 611 * This value is by default {@code false}. 612 * 613 * @param value if {@code true}, continue after shutdown, else don't 614 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy 615 */ 616 void setContinueExistingPeriodicTasksAfterShutdownPolicy(bool value) { 617 continueExistingPeriodicTasksAfterShutdown = value; 618 if (!value && isShutdown()) 619 onShutdown(); 620 } 621 622 /** 623 * Gets the policy on whether to continue executing existing 624 * periodic tasks even when this executor has been {@code shutdown}. 625 * In this case, executions will continue until {@code shutdownNow} 626 * or the policy is set to {@code false} when already shutdown. 627 * This value is by default {@code false}. 628 * 629 * @return {@code true} if will continue after shutdown 630 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy 631 */ 632 bool getContinueExistingPeriodicTasksAfterShutdownPolicy() { 633 return continueExistingPeriodicTasksAfterShutdown; 634 } 635 636 /** 637 * Sets the policy on whether to execute existing delayed 638 * tasks even when this executor has been {@code shutdown}. 639 * In this case, these tasks will only terminate upon 640 * {@code shutdownNow}, or after setting the policy to 641 * {@code false} when already shutdown. 642 * This value is by default {@code true}. 643 * 644 * @param value if {@code true}, execute after shutdown, else don't 645 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy 646 */ 647 void setExecuteExistingDelayedTasksAfterShutdownPolicy(bool value) { 648 executeExistingDelayedTasksAfterShutdown = value; 649 if (!value && isShutdown()) 650 onShutdown(); 651 } 652 653 /** 654 * Gets the policy on whether to execute existing delayed 655 * tasks even when this executor has been {@code shutdown}. 656 * In this case, these tasks will only terminate upon 657 * {@code shutdownNow}, or after setting the policy to 658 * {@code false} when already shutdown. 659 * This value is by default {@code true}. 660 * 661 * @return {@code true} if will execute after shutdown 662 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy 663 */ 664 bool getExecuteExistingDelayedTasksAfterShutdownPolicy() { 665 return executeExistingDelayedTasksAfterShutdown; 666 } 667 668 /** 669 * Sets the policy on whether cancelled tasks should be immediately 670 * removed from the work queue at time of cancellation. This value is 671 * by default {@code false}. 672 * 673 * @param value if {@code true}, remove on cancellation, else don't 674 * @see #getRemoveOnCancelPolicy 675 */ 676 void setRemoveOnCancelPolicy(bool value) { 677 removeOnCancel = value; 678 } 679 680 /** 681 * Gets the policy on whether cancelled tasks should be immediately 682 * removed from the work queue at time of cancellation. This value is 683 * by default {@code false}. 684 * 685 * @return {@code true} if cancelled tasks are immediately removed 686 * from the queue 687 * @see #setRemoveOnCancelPolicy 688 */ 689 bool getRemoveOnCancelPolicy() { 690 return removeOnCancel; 691 } 692 693 /** 694 * Initiates an orderly shutdown in which previously submitted 695 * tasks are executed, but no new tasks will be accepted. 696 * Invocation has no additional effect if already shut down. 697 * 698 * <p>This method does not wait for previously submitted tasks to 699 * complete execution. Use {@link #awaitTermination awaitTermination} 700 * to do that. 701 * 702 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy} 703 * has been set {@code false}, existing delayed tasks whose delays 704 * have not yet elapsed are cancelled. And unless the {@code 705 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set 706 * {@code true}, future executions of existing periodic tasks will 707 * be cancelled. 708 * 709 * @throws SecurityException {@inheritDoc} 710 */ 711 override void shutdown() { 712 super.shutdown(); 713 } 714 715 /** 716 * Attempts to stop all actively executing tasks, halts the 717 * processing of waiting tasks, and returns a list of the tasks 718 * that were awaiting execution. These tasks are drained (removed) 719 * from the task queue upon return from this method. 720 * 721 * <p>This method does not wait for actively executing tasks to 722 * terminate. Use {@link #awaitTermination awaitTermination} to 723 * do that. 724 * 725 * <p>There are no guarantees beyond best-effort attempts to stop 726 * processing actively executing tasks. This implementation 727 * interrupts tasks via {@link Thread#interrupt}; any task that 728 * fails to respond to interrupts may never terminate. 729 * 730 * @return list of tasks that never commenced execution. 731 * Each element of this list is a {@link ScheduledFuture}. 732 * For tasks submitted via one of the {@code schedule} 733 * methods, the element will be identical to the returned 734 * {@code ScheduledFuture}. For tasks submitted using 735 * {@link #execute execute}, the element will be a 736 * zero-delay {@code ScheduledFuture}. 737 * @throws SecurityException {@inheritDoc} 738 */ 739 override List!(Runnable) shutdownNow() { 740 return super.shutdownNow(); 741 } 742 743 /** 744 * Returns the task queue used by this executor. Access to the 745 * task queue is intended primarily for debugging and monitoring. 746 * This queue may be in active use. Retrieving the task queue 747 * does not prevent queued tasks from executing. 748 * 749 * <p>Each element of this queue is a {@link ScheduledFuture}. 750 * For tasks submitted via one of the {@code schedule} methods, the 751 * element will be identical to the returned {@code ScheduledFuture}. 752 * For tasks submitted using {@link #execute execute}, the element 753 * will be a zero-delay {@code ScheduledFuture}. 754 * 755 * <p>Iteration over this queue is <em>not</em> guaranteed to traverse 756 * tasks in the order in which they will execute. 757 * 758 * @return the task queue 759 */ 760 override BlockingQueue!(Runnable) getQueue() { 761 return super.getQueue(); 762 } 763 } 764 765 766 /** 767 */ 768 private class ScheduledFutureTask(V) : FutureTask!(V) , 769 RunnableScheduledFuture!(V), IScheduledFutureTask { 770 771 /** Sequence number to break ties FIFO */ 772 private long sequenceNumber; 773 774 /** The nanoTime-based time when the task is enabled to execute. */ 775 private long time; 776 777 /** 778 * Period for repeating tasks, in nanoseconds. 779 * A positive value indicates fixed-rate execution. 780 * A negative value indicates fixed-delay execution. 781 * A value of 0 indicates a non-repeating (one-shot) task. 782 */ 783 private long period; 784 785 /** The actual task to be re-enqueued by reExecutePeriodic */ 786 RunnableScheduledFuture!(V) outerTask; // = this; 787 ScheduledThreadPoolExecutor poolExecutor; 788 789 /** 790 * Index into delay queue, to support faster cancellation. 791 */ 792 int _heapIndex; 793 794 static if(is(V == void)) { 795 this(Runnable r, long triggerTime, 796 long sequenceNumber, ScheduledThreadPoolExecutor poolExecutor) { 797 super(r); 798 this.time = triggerTime; 799 this.period = 0; 800 this.sequenceNumber = sequenceNumber; 801 this.poolExecutor = poolExecutor; 802 initializeMembers(); 803 } 804 805 /** 806 * Creates a periodic action with given nanoTime-based initial 807 * trigger time and period. 808 */ 809 this(Runnable r, long triggerTime, 810 long period, long sequenceNumber, ScheduledThreadPoolExecutor poolExecutor) { 811 super(r); 812 this.time = triggerTime; 813 this.period = period; 814 this.sequenceNumber = sequenceNumber; 815 this.poolExecutor = poolExecutor; 816 initializeMembers(); 817 } 818 } else { 819 820 /** 821 * Creates a one-shot action with given nanoTime-based trigger time. 822 */ 823 this(Runnable r, V result, long triggerTime, 824 long sequenceNumber, ScheduledThreadPoolExecutor poolExecutor) { 825 super(r, result); 826 this.time = triggerTime; 827 this.period = 0; 828 this.sequenceNumber = sequenceNumber; 829 this.poolExecutor = poolExecutor; 830 initializeMembers(); 831 } 832 833 /** 834 * Creates a periodic action with given nanoTime-based initial 835 * trigger time and period. 836 */ 837 this(Runnable r, V result, long triggerTime, 838 long period, long sequenceNumber, ScheduledThreadPoolExecutor poolExecutor) { 839 super(r, result); 840 this.time = triggerTime; 841 this.period = period; 842 this.sequenceNumber = sequenceNumber; 843 this.poolExecutor = poolExecutor; 844 initializeMembers(); 845 } 846 } 847 848 /** 849 * Creates a one-shot action with given nanoTime-based trigger time. 850 */ 851 this(Callable!(V) callable, long triggerTime, 852 long sequenceNumber, ScheduledThreadPoolExecutor poolExecutor) { 853 super(callable); 854 this.time = triggerTime; 855 this.period = 0; 856 this.sequenceNumber = sequenceNumber; 857 this.poolExecutor = poolExecutor; 858 initializeMembers(); 859 } 860 861 private void initializeMembers() { 862 outerTask = this; 863 } 864 865 void heapIndex(int index) { 866 _heapIndex = index; 867 } 868 869 int heapIndex() { 870 return _heapIndex; 871 } 872 873 Duration getDelay() { 874 return dur!(TimeUnit.HectoNanosecond)(time - Clock.currStdTime()); 875 } 876 877 int opCmp(Delayed other) { 878 if (other == this) // compare zero if same object 879 return 0; 880 ScheduledFutureTask!V x = cast(ScheduledFutureTask!V)other; 881 if (x !is null) { 882 long diff = time - x.time; 883 if (diff < 0) 884 return -1; 885 else if (diff > 0) 886 return 1; 887 else if (sequenceNumber < x.sequenceNumber) 888 return -1; 889 else 890 return 1; 891 } 892 Duration diff = getDelay() - other.getDelay(); 893 return (diff.isNegative) ? -1 : (diff > Duration.zero) ? 1 : 0; 894 } 895 896 /** 897 * Returns {@code true} if this is a periodic (not a one-shot) action. 898 * 899 * @return {@code true} if periodic 900 */ 901 bool isPeriodic() { 902 return period != 0; 903 } 904 905 /** 906 * Sets the next time to run for a periodic task. 907 */ 908 private void setNextRunTime() { 909 long p = period; 910 if (p > 0) 911 time += p; 912 else 913 time = poolExecutor.triggerTime(-p); 914 } 915 916 override bool cancel(bool mayInterruptIfRunning) { 917 // The racy read of heapIndex below is benign: 918 // if heapIndex < 0, then OOTA guarantees that we have surely 919 // been removed; else we recheck under lock in remove() 920 bool cancelled = super.cancel(mayInterruptIfRunning); 921 if (cancelled && poolExecutor.removeOnCancel && heapIndex >= 0) 922 poolExecutor.remove(this); 923 return cancelled; 924 } 925 926 /** 927 * Overrides FutureTask version so as to reset/requeue if periodic. 928 */ 929 override void run() { 930 if (!poolExecutor.canRunInCurrentRunState(this)) 931 cancel(false); 932 else if (!isPeriodic()) 933 super.run(); 934 else if (super.runAndReset()) { 935 setNextRunTime(); 936 poolExecutor.reExecutePeriodic(outerTask); 937 } 938 } 939 940 // alias from FutureTask 941 // alias isCancelled = FutureTask!V.isCancelled; 942 // alias isDone = FutureTask!V.isDone; 943 alias get = FutureTask!V.get; 944 945 override bool isCancelled() { 946 return super.isCancelled(); 947 } 948 949 override bool isDone() { 950 return super.isDone(); 951 } 952 953 override V get() { 954 return super.get(); 955 } 956 957 override V get(Duration timeout) { 958 return super.get(timeout); 959 } 960 } 961 962 963 /** 964 * Specialized delay queue. To mesh with TPE declarations, this 965 * class must be declared as a BlockingQueue!(Runnable) even though 966 * it can only hold RunnableScheduledFutures. 967 */ 968 class DelayedWorkQueue : AbstractQueue!(Runnable), BlockingQueue!(Runnable) { 969 970 /* 971 * A DelayedWorkQueue is based on a heap-based data structure 972 * like those in DelayQueue and PriorityQueue, except that 973 * every ScheduledFutureTask also records its index into the 974 * heap array. This eliminates the need to find a task upon 975 * cancellation, greatly speeding up removal (down from O(n) 976 * to O(log n)), and reducing garbage retention that would 977 * otherwise occur by waiting for the element to rise to top 978 * before clearing. But because the queue may also hold 979 * RunnableScheduledFutures that are not ScheduledFutureTasks, 980 * we are not guaranteed to have such indices available, in 981 * which case we fall back to linear search. (We expect that 982 * most tasks will not be decorated, and that the faster cases 983 * will be much more common.) 984 * 985 * All heap operations must record index changes -- mainly 986 * within siftUp and siftDown. Upon removal, a task's 987 * heapIndex is set to -1. Note that ScheduledFutureTasks can 988 * appear at most once in the queue (this need not be true for 989 * other kinds of tasks or work queues), so are uniquely 990 * identified by heapIndex. 991 */ 992 993 private enum int INITIAL_CAPACITY = 16; 994 private IRunnableScheduledFuture[] queue; 995 private ReentrantLock lock; 996 private int _size; 997 998 /** 999 * Thread designated to wait for the task at the head of the 1000 * queue. This variant of the Leader-Follower pattern 1001 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to 1002 * minimize unnecessary timed waiting. When a thread becomes 1003 * the leader, it waits only for the next delay to elapse, but 1004 * other threads await indefinitely. The leader thread must 1005 * signal some other thread before returning from take() or 1006 * poll(...), unless some other thread becomes leader in the 1007 * interim. Whenever the head of the queue is replaced with a 1008 * task with an earlier expiration time, the leader field is 1009 * invalidated by being reset to null, and some waiting 1010 * thread, but not necessarily the current leader, is 1011 * signalled. So waiting threads must be prepared to acquire 1012 * and lose leadership while waiting. 1013 */ 1014 private ThreadEx leader; 1015 1016 /** 1017 * Condition signalled when a newer task becomes available at the 1018 * head of the queue or a new thread may need to become leader. 1019 */ 1020 private Condition available; 1021 1022 this() { 1023 initializeMembers(); 1024 } 1025 1026 private void initializeMembers() { 1027 lock = new ReentrantLock(); 1028 available = new Condition(lock); 1029 queue = new IRunnableScheduledFuture[INITIAL_CAPACITY]; 1030 } 1031 1032 /** 1033 * Sets f's heapIndex if it is a ScheduledFutureTask. 1034 */ 1035 private static void setIndex(IRunnableScheduledFuture f, int idx) { 1036 IScheduledFutureTask t = cast(IScheduledFutureTask)f; 1037 // tracef("index=%d, type: %s", idx, typeid(cast(Object)t)); 1038 if (t !is null) 1039 t.heapIndex = idx; 1040 } 1041 1042 /** 1043 * Sifts element added at bottom up to its heap-ordered spot. 1044 * Call only when holding lock. 1045 */ 1046 private void siftUp(int k, IRunnableScheduledFuture key) { 1047 while (k > 0) { 1048 int parent = (k - 1) >>> 1; 1049 IRunnableScheduledFuture e = queue[parent]; 1050 if (key >= e) 1051 break; 1052 queue[k] = e; 1053 setIndex(e, k); 1054 k = parent; 1055 } 1056 // tracef("k=%d, key is null: %s", k, key is null); 1057 queue[k] = key; 1058 setIndex(key, k); 1059 } 1060 1061 /** 1062 * Sifts element added at top down to its heap-ordered spot. 1063 * Call only when holding lock. 1064 */ 1065 private void siftDown(int k, IRunnableScheduledFuture key) { 1066 int half = size >>> 1; 1067 while (k < half) { 1068 int child = (k << 1) + 1; 1069 IRunnableScheduledFuture c = queue[child]; 1070 int right = child + 1; 1071 if (right < size && c.opCmp(queue[right]) > 0) 1072 c = queue[child = right]; 1073 if (key.opCmp(c) <= 0) 1074 break; 1075 queue[k] = c; 1076 setIndex(c, k); 1077 k = child; 1078 } 1079 queue[k] = key; 1080 setIndex(key, k); 1081 } 1082 1083 /** 1084 * Resizes the heap array. Call only when holding lock. 1085 */ 1086 private void grow() { 1087 size_t oldCapacity = queue.length; 1088 size_t newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50% 1089 if (newCapacity < 0) // overflow 1090 newCapacity = int.max; 1091 queue.length = newCapacity; 1092 } 1093 1094 /** 1095 * Finds index of given object, or -1 if absent. 1096 */ 1097 private int indexOf(Runnable x) { 1098 if (x !is null) { 1099 IScheduledFutureTask sf = cast(IScheduledFutureTask) x; 1100 if (sf !is null) { 1101 int i = sf.heapIndex; 1102 // Sanity check; x could conceivably be a 1103 // ScheduledFutureTask from some other pool. 1104 if (i >= 0 && i < size && queue[i] == x) 1105 return i; 1106 } else { 1107 for (int i = 0; i < size; i++) { 1108 // if (x.opEquals(cast(Object)queue[i])) 1109 if(x is queue[i]) 1110 return i; 1111 } 1112 } 1113 } 1114 return -1; 1115 } 1116 1117 override bool contains(Runnable x) { 1118 ReentrantLock lock = this.lock; 1119 lock.lock(); 1120 try { 1121 return indexOf(x) != -1; 1122 } finally { 1123 lock.unlock(); 1124 } 1125 } 1126 1127 override bool remove(Runnable x) { 1128 ReentrantLock lock = this.lock; 1129 trace(cast(Object)x); 1130 lock.lock(); 1131 try { 1132 int i = indexOf(x); 1133 if (i < 0) 1134 return false; 1135 1136 setIndex(queue[i], -1); 1137 int s = --_size; 1138 IRunnableScheduledFuture replacement = queue[s]; 1139 queue[s] = null; 1140 if (s != i) { 1141 siftDown(i, replacement); 1142 if (queue[i] == replacement) 1143 siftUp(i, replacement); 1144 } 1145 return true; 1146 } finally { 1147 lock.unlock(); 1148 } 1149 } 1150 1151 override int size() { 1152 // return _size; 1153 ReentrantLock lock = this.lock; 1154 lock.lock(); 1155 try { 1156 return _size; 1157 } finally { 1158 lock.unlock(); 1159 } 1160 } 1161 1162 override bool isEmpty() { 1163 return size() == 0; 1164 } 1165 1166 int remainingCapacity() { 1167 return int.max; 1168 } 1169 1170 IRunnableScheduledFuture peek() { 1171 ReentrantLock lock = this.lock; 1172 lock.lock(); 1173 try { 1174 return queue[0]; 1175 } finally { 1176 lock.unlock(); 1177 } 1178 } 1179 1180 bool offer(Runnable x) { 1181 if (x is null) 1182 throw new NullPointerException(); 1183 IRunnableScheduledFuture e = cast(IRunnableScheduledFuture)x; 1184 ReentrantLock lock = this.lock; 1185 lock.lock(); 1186 try { 1187 int i = _size; 1188 if (i >= queue.length) 1189 grow(); 1190 _size = i + 1; 1191 if (i == 0) { 1192 queue[0] = e; 1193 setIndex(e, 0); 1194 } else { 1195 siftUp(i, e); 1196 } 1197 if (queue[0] == e) { 1198 leader = null; 1199 available.notify(); 1200 } 1201 } finally { 1202 lock.unlock(); 1203 } 1204 return true; 1205 } 1206 1207 override void put(Runnable e) { 1208 offer(e); 1209 } 1210 1211 override bool add(Runnable e) { 1212 return offer(e); 1213 } 1214 1215 bool offer(Runnable e, Duration timeout) { 1216 return offer(e); 1217 } 1218 1219 /** 1220 * Performs common bookkeeping for poll and take: Replaces 1221 * first element with last and sifts it down. Call only when 1222 * holding lock. 1223 * @param f the task to remove and return 1224 */ 1225 private IRunnableScheduledFuture finishPoll(IRunnableScheduledFuture f) { 1226 int s = --_size; 1227 IRunnableScheduledFuture x = queue[s]; 1228 queue[s] = null; 1229 if (s != 0) 1230 siftDown(0, x); 1231 setIndex(f, -1); 1232 return f; 1233 } 1234 1235 IRunnableScheduledFuture poll() { 1236 ReentrantLock lock = this.lock; 1237 lock.lock(); 1238 try { 1239 IRunnableScheduledFuture first = queue[0]; 1240 return (first is null || first.getDelay() > Duration.zero) 1241 ? null 1242 : finishPoll(first); 1243 } finally { 1244 lock.unlock(); 1245 } 1246 } 1247 1248 IRunnableScheduledFuture take() { 1249 ReentrantLock lock = this.lock; 1250 // lock.lockInterruptibly(); 1251 lock.lock(); 1252 try { 1253 for (;;) { 1254 IRunnableScheduledFuture first = queue[0]; 1255 if (first is null) 1256 available.wait(); 1257 else { 1258 Duration delay = first.getDelay(); 1259 if (delay <= Duration.zero) 1260 return finishPoll(first); 1261 first = null; // don't retain ref while waiting 1262 if (leader !is null) 1263 available.wait(); 1264 else { 1265 ThreadEx thisThread = ThreadEx.currentThread(); 1266 leader = thisThread; 1267 try { 1268 available.wait(delay); 1269 } finally { 1270 if (leader == thisThread) 1271 leader = null; 1272 } 1273 } 1274 } 1275 } 1276 } finally { 1277 if (leader is null && queue[0] !is null) 1278 available.notify(); 1279 lock.unlock(); 1280 } 1281 } 1282 1283 IRunnableScheduledFuture poll(Duration timeout) { 1284 // long nanos = total!(TimeUnit.HectoNanosecond)(timeout); 1285 Duration nanos = timeout; 1286 ReentrantLock lock = this.lock; 1287 // lock.lockInterruptibly(); 1288 lock.lock(); 1289 try { 1290 for (;;) { 1291 IRunnableScheduledFuture first = queue[0]; 1292 if (first is null) { 1293 if (nanos <= Duration.zero) 1294 return null; 1295 else 1296 available.wait(nanos); // nanos = 1297 } else { 1298 Duration delay = first.getDelay(); 1299 if (delay <= Duration.zero) 1300 return finishPoll(first); 1301 if (nanos <= Duration.zero) 1302 return null; 1303 first = null; // don't retain ref while waiting 1304 if (nanos < delay || leader !is null) 1305 available.wait(nanos); // nanos = 1306 else { 1307 ThreadEx thisThread = ThreadEx.currentThread(); 1308 leader = thisThread; 1309 try { 1310 available.wait(delay); 1311 nanos -= delay; 1312 // long timeLeft = available.wait(delay); 1313 // nanos -= delay - timeLeft; 1314 } finally { 1315 if (leader == thisThread) 1316 leader = null; 1317 } 1318 } 1319 } 1320 } 1321 } finally { 1322 if (leader is null && queue[0] !is null) 1323 available.notify(); 1324 lock.unlock(); 1325 } 1326 } 1327 1328 override void clear() { 1329 ReentrantLock lock = this.lock; 1330 lock.lock(); 1331 try { 1332 for (int i = 0; i < size; i++) { 1333 IRunnableScheduledFuture t = queue[i]; 1334 if (t !is null) { 1335 queue[i] = null; 1336 setIndex(t, -1); 1337 } 1338 } 1339 _size = 0; 1340 } finally { 1341 lock.unlock(); 1342 } 1343 } 1344 1345 int drainTo(Collection!(Runnable) c) { 1346 return drainTo(c, int.max); 1347 } 1348 1349 int drainTo(Collection!(Runnable) c, int maxElements) { 1350 // Objects.requireNonNull(c); 1351 1352 if (c == this) 1353 throw new IllegalArgumentException(); 1354 if (maxElements <= 0) 1355 return 0; 1356 ReentrantLock lock = this.lock; 1357 lock.lock(); 1358 try { 1359 int n = 0; 1360 for (IRunnableScheduledFuture first; 1361 n < maxElements 1362 && (first = queue[0]) !is null 1363 && first.getDelay() <= Duration.zero;) { 1364 c.add(first); // In this order, in case add() throws. 1365 finishPoll(first); 1366 ++n; 1367 } 1368 return n; 1369 } finally { 1370 lock.unlock(); 1371 } 1372 } 1373 1374 override Runnable[] toArray() { 1375 ReentrantLock lock = this.lock; 1376 lock.lock(); 1377 try { 1378 Runnable[] r = new Runnable[_size]; 1379 for(int i=0; i<_size; i++) { 1380 r[i] = queue[i]; 1381 } 1382 return r; 1383 1384 } finally { 1385 lock.unlock(); 1386 } 1387 } 1388 1389 override int opApply(scope int delegate(ref Runnable) dg) { 1390 if(dg is null) 1391 throw new NullPointerException(); 1392 ReentrantLock lock = this.lock; 1393 lock.lock(); 1394 scope(exit) lock.unlock(); 1395 1396 int result = 0; 1397 foreach(int i; 0.._size) { 1398 Runnable v = queue[i]; 1399 result = dg(v); 1400 if(result != 0) return result; 1401 } 1402 return result; 1403 } 1404 1405 1406 // Iterator!(Runnable) iterator() { 1407 // ReentrantLock lock = this.lock; 1408 // lock.lock(); 1409 // try { 1410 // return new Itr(Arrays.copyOf(queue, size)); 1411 // } finally { 1412 // lock.unlock(); 1413 // } 1414 // } 1415 1416 /** 1417 * Snapshot iterator that works off copy of underlying q array. 1418 */ 1419 // private class Itr : Iterator!(Runnable) { 1420 // final IRunnableScheduledFuture[] array; 1421 // int cursor; // index of next element to return; initially 0 1422 // int lastRet = -1; // index of last element returned; -1 if no such 1423 1424 // this(IRunnableScheduledFuture[] array) { 1425 // this.array = array; 1426 // } 1427 1428 // bool hasNext() { 1429 // return cursor < array.length; 1430 // } 1431 1432 // Runnable next() { 1433 // if (cursor >= array.length) 1434 // throw new NoSuchElementException(); 1435 // return array[lastRet = cursor++]; 1436 // } 1437 1438 // void remove() { 1439 // if (lastRet < 0) 1440 // throw new IllegalStateException(); 1441 // DelayedWorkQueue.this.remove(array[lastRet]); 1442 // lastRet = -1; 1443 // } 1444 // } 1445 1446 override bool opEquals(IObject o) { 1447 return opEquals(cast(Object) o); 1448 } 1449 1450 override bool opEquals(Object o) { 1451 return super.opEquals(o); 1452 } 1453 1454 override string toString() { 1455 return super.toString(); 1456 } 1457 1458 override size_t toHash() @trusted nothrow { 1459 return super.toHash(); 1460 } 1461 }