1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.concurrency.ScheduledThreadPoolExecutor;
13 
14 import hunt.concurrency.atomic.AtomicHelper;
15 import hunt.concurrency.BlockingQueue;
16 import hunt.concurrency.Delayed;
17 import hunt.concurrency.Future;
18 import hunt.concurrency.FutureTask;
19 import hunt.concurrency.ScheduledExecutorService;
20 import hunt.concurrency.thread;
21 import hunt.concurrency.ThreadFactory;
22 import hunt.concurrency.ThreadPoolExecutor;
23 
24 import hunt.collection;
25 import hunt.Exceptions;
26 import hunt.util.Common;
27 import hunt.util.DateTime;
28 import hunt.util.Runnable;
29 import hunt.Object;
30 import hunt.logging;
31 // import core.time;
32 
33 import core.atomic;
34 import core.sync.condition;
35 import core.sync.mutex;
36 
37 import std.datetime;
38 // import hunt.collection.AbstractQueue;
39 // import java.util.Arrays;
40 // import hunt.collection.Collection;
41 // import hunt.collection.Iterator;
42 // import java.util.List;
43 // import java.util.NoSuchElementException;
44 // import java.util.Objects;
45 // import hunt.concurrency.atomic.AtomicLong;
46 // import hunt.concurrency.locks.Condition;
47 // import hunt.concurrency.locks.ReentrantLock;
48 
49 alias ReentrantLock = Mutex;
50 
51 interface IScheduledFutureTask {
52     void heapIndex(int index);
53     int heapIndex();
54 }
55 
56 /**
57  * A {@link ThreadPoolExecutor} that can additionally schedule
58  * commands to run after a given delay, or to execute periodically.
59  * This class is preferable to {@link java.util.Timer} when multiple
60  * worker threads are needed, or when the additional flexibility or
61  * capabilities of {@link ThreadPoolExecutor} (which this class
62  * extends) are required.
63  *
64  * <p>Delayed tasks execute no sooner than they are enabled, but
65  * without any real-time guarantees about when, after they are
66  * enabled, they will commence. Tasks scheduled for exactly the same
67  * execution time are enabled in first-in-first-out (FIFO) order of
68  * submission.
69  *
70  * <p>When a submitted task is cancelled before it is run, execution
71  * is suppressed.  By default, such a cancelled task is not
72  * automatically removed from the work queue until its delay elapses.
73  * While this enables further inspection and monitoring, it may also
74  * cause unbounded retention of cancelled tasks.  To avoid this, use
75  * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
76  * removed from the work queue at time of cancellation.
77  *
78  * <p>Successive executions of a periodic task scheduled via
79  * {@link #scheduleAtFixedRate scheduleAtFixedRate} or
80  * {@link #scheduleWithFixedDelay scheduleWithFixedDelay}
81  * do not overlap. While different executions may be performed by
82  * different threads, the effects of prior executions
83  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
84  * those of subsequent ones.
85  *
86  * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
87  * of the inherited tuning methods are not useful for it. In
88  * particular, because it acts as a fixed-sized pool using
89  * {@code corePoolSize} threads and an unbounded queue, adjustments
90  * to {@code maximumPoolSize} have no useful effect. Additionally, it
91  * is almost never a good idea to set {@code corePoolSize} to zero or
92  * use {@code allowCoreThreadTimeOut} because this may leave the pool
93  * without threads to handle tasks once they become eligible to run.
94  *
95  * <p>As with {@code ThreadPoolExecutor}, if not otherwise specified,
96  * this class uses {@link Executors#defaultThreadFactory} as the
97  * default thread factory, and {@link ThreadPoolExecutor.AbortPolicy}
98  * as the default rejected execution handler.
99  *
100  * <p><b>Extension notes:</b> This class overrides the
101  * {@link ThreadPoolExecutor#execute(Runnable) execute} and
102  * {@link AbstractExecutorService#submit(Runnable) submit}
103  * methods to generate internal {@link ScheduledFuture} objects to
104  * control per-task delays and scheduling.  To preserve
105  * functionality, any further overrides of these methods in
106  * subclasses must invoke superclass versions, which effectively
107  * disables additional task customization.  However, this class
108  * provides alternative protected extension method
109  * {@code decorateTask} (one version each for {@code Runnable} and
110  * {@code Callable}) that can be used to customize the concrete task
111  * types used to execute commands entered via {@code execute},
112  * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
113  * and {@code scheduleWithFixedDelay}.  By default, a
114  * {@code ScheduledThreadPoolExecutor} uses a task type extending
115  * {@link FutureTask}. However, this may be modified or replaced using
116  * subclasses of the form:
117  *
118  * <pre> {@code
119  * class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
120  *
121  *   static class CustomTask!(V) : RunnableScheduledFuture!(V) { ... }
122  *
123  *   protected !(V) RunnableScheduledFuture!(V) decorateTask(
124  *                Runnable r, RunnableScheduledFuture!(V) task) {
125  *       return new CustomTask!(V)(r, task);
126  *   }
127  *
128  *   protected !(V) RunnableScheduledFuture!(V) decorateTask(
129  *                Callable!(V) c, RunnableScheduledFuture!(V) task) {
130  *       return new CustomTask!(V)(c, task);
131  *   }
132  *   // ... add constructors, etc.
133  * }}</pre>
134  *
135  * @author Doug Lea
136  */
137 class ScheduledThreadPoolExecutor : ThreadPoolExecutor, ScheduledExecutorService {
138 
139     /*
140      * This class specializes ThreadPoolExecutor implementation by
141      *
142      * 1. Using a custom task type ScheduledFutureTask, even for tasks
143      *    that don't require scheduling because they are submitted
144      *    using ExecutorService rather than ScheduledExecutorService
145      *    methods, which are treated as tasks with a delay of zero.
146      *
147      * 2. Using a custom queue (DelayedWorkQueue), a variant of
148      *    unbounded DelayQueue. The lack of capacity constraint and
149      *    the fact that corePoolSize and maximumPoolSize are
150      *    effectively identical simplifies some execution mechanics
151      *    (see delayedExecute) compared to ThreadPoolExecutor.
152      *
153      * 3. Supporting optional run-after-shutdown parameters, which
154      *    leads to overrides of shutdown methods to remove and cancel
155      *    tasks that should NOT be run after shutdown, as well as
156      *    different recheck logic when task (re)submission overlaps
157      *    with a shutdown.
158      *
159      * 4. Task decoration methods to allow interception and
160      *    instrumentation, which are needed because subclasses cannot
161      *    otherwise override submit methods to get this effect. These
162      *    don't have any impact on pool control logic though.
163      */
164 
165     /**
166      * False if should cancel/suppress periodic tasks on shutdown.
167      */
168     private bool continueExistingPeriodicTasksAfterShutdown;
169 
170     /**
171      * False if should cancel non-periodic not-yet-expired tasks on shutdown.
172      */
173     private bool executeExistingDelayedTasksAfterShutdown = true;
174 
175     /**
176      * True if ScheduledFutureTask.cancel should remove from queue.
177      */
178     bool removeOnCancel;
179 
180     /**
181      * Sequence number to break scheduling ties, and in turn to
182      * guarantee FIFO order among tied entries.
183      */
184     private shared static long sequencer; //= new AtomicLong();
185 
186     /**
187      * Returns true if can run a task given current run state and
188      * run-after-shutdown parameters.
189      */
190     bool canRunInCurrentRunState(V)(RunnableScheduledFuture!V task) {
191         if (!isShutdown())
192             return true;
193         if (isStopped())
194             return false;
195         return task.isPeriodic()
196             ? continueExistingPeriodicTasksAfterShutdown
197             : (executeExistingDelayedTasksAfterShutdown
198                || task.getDelay() <= Duration.zero);
199     }
200 
201     /**
202      * Main execution method for delayed or periodic tasks.  If pool
203      * is shut down, rejects the task. Otherwise adds task to queue
204      * and starts a thread, if necessary, to run it.  (We cannot
205      * prestart the thread to run the task because the task (probably)
206      * shouldn't be run yet.)  If the pool is shut down while the task
207      * is being added, cancel and remove it if required by state and
208      * run-after-shutdown parameters.
209      *
210      * @param task the task
211      */
212     private void delayedExecute(V)(RunnableScheduledFuture!V task) {
213         if (isShutdown())
214             reject(task);
215         else {
216             super.getQueue().add(task);
217             if (!canRunInCurrentRunState(task) && remove(task))
218                 task.cancel(false);
219             else
220                 ensurePrestart();
221         }
222     }
223 
224     /**
225      * Requeues a periodic task unless current run state precludes it.
226      * Same idea as delayedExecute except drops task rather than rejecting.
227      *
228      * @param task the task
229      */
230     void reExecutePeriodic(V)(RunnableScheduledFuture!V task) {
231         if (canRunInCurrentRunState(task)) {
232             super.getQueue().add(task);
233             if (canRunInCurrentRunState(task) || !remove(task)) {
234                 ensurePrestart();
235                 return;
236             }
237         }
238         task.cancel(false);
239     }
240 
241     /**
242      * Cancels and clears the queue of all tasks that should not be run
243      * due to shutdown policy.  Invoked within super.shutdown.
244      */
245     override void onShutdown() {
246         BlockingQueue!(Runnable) q = super.getQueue();
247         bool keepDelayed =
248             getExecuteExistingDelayedTasksAfterShutdownPolicy();
249         bool keepPeriodic =
250             getContinueExistingPeriodicTasksAfterShutdownPolicy();
251         // Traverse snapshot to avoid iterator exceptions
252         // TODO: implement and use efficient removeIf
253         // super.getQueue().removeIf(...);
254         version(HUNT_DEBUG) tracef("Shuting down..., BlockingQueue size: %d", q.size());
255         foreach (Runnable e ; q.toArray()) {
256             if(e is null) {
257                 warning("e is null");
258             } else {
259                 version(HUNT_DEBUG) trace(typeid(cast(Object)e));
260                 IRunnableScheduledFuture t = cast(IRunnableScheduledFuture)e;
261                 if (t !is null) {
262                     if ((t.isPeriodic()
263                          ? !keepPeriodic
264                          : (!keepDelayed && t.getDelay() > Duration.zero))
265                         || t.isCancelled()) { // also remove if already cancelled
266                         if (q.remove(t))
267                             t.cancel(false);
268                     }
269                 } else {
270                     warning("t is null");
271                 }
272             }
273                 
274         }
275         tryTerminate();
276     }
277 
278     /**
279      * Modifies or replaces the task used to execute a runnable.
280      * This method can be used to override the concrete
281      * class used for managing internal tasks.
282      * The default implementation simply returns the given task.
283      *
284      * @param runnable the submitted Runnable
285      * @param task the task created to execute the runnable
286      * @param (V) the type of the task's result
287      * @return a task that can execute the runnable
288      */
289     protected RunnableScheduledFuture!(V) decorateTask(V) (
290         Runnable runnable, RunnableScheduledFuture!(V) task) {
291         return task;
292     }
293 
294     /**
295      * Modifies or replaces the task used to execute a callable.
296      * This method can be used to override the concrete
297      * class used for managing internal tasks.
298      * The default implementation simply returns the given task.
299      *
300      * @param callable the submitted Callable
301      * @param task the task created to execute the callable
302      * @param (V) the type of the task's result
303      * @return a task that can execute the callable
304      */
305     protected RunnableScheduledFuture!(V) decorateTask(V)(
306         Callable!(V) callable, RunnableScheduledFuture!(V) task) {
307         return task;
308     }
309 
310     /**
311      * The default keep-alive time for pool threads.
312      *
313      * Normally, this value is unused because all pool threads will be
314      * core threads, but if a user creates a pool with a corePoolSize
315      * of zero (against our advice), we keep a thread alive as long as
316      * there are queued tasks.  If the keep alive time is zero (the
317      * historic value), we end up hot-spinning in getTask, wasting a
318      * CPU.  But on the other hand, if we set the value too high, and
319      * users create a one-shot pool which they don't cleanly shutdown,
320      * the pool's non-daemon threads will prevent JVM termination.  A
321      * small but non-zero value (relative to a JVM's lifetime) seems
322      * best.
323      */
324     private enum long DEFAULT_KEEPALIVE_MILLIS = 10L;
325 
326     /**
327      * Creates a new {@code ScheduledThreadPoolExecutor} with the
328      * given core pool size.
329      *
330      * @param corePoolSize the number of threads to keep in the pool, even
331      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
332      * @throws IllegalArgumentException if {@code corePoolSize < 0}
333      */
334     this(int corePoolSize) {
335         super(corePoolSize, int.max, dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE_MILLIS),
336               new DelayedWorkQueue());
337     }
338 
339     /**
340      * Creates a new {@code ScheduledThreadPoolExecutor} with the
341      * given initial parameters.
342      *
343      * @param corePoolSize the number of threads to keep in the pool, even
344      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
345      * @param threadFactory the factory to use when the executor
346      *        creates a new thread
347      * @throws IllegalArgumentException if {@code corePoolSize < 0}
348      * @throws NullPointerException if {@code threadFactory} is null
349      */
350     this(int corePoolSize, ThreadFactory threadFactory) {
351         super(corePoolSize, int.max,
352               dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE_MILLIS),
353               new DelayedWorkQueue(), threadFactory);
354     }
355 
356     /**
357      * Creates a new {@code ScheduledThreadPoolExecutor} with the
358      * given initial parameters.
359      *
360      * @param corePoolSize the number of threads to keep in the pool, even
361      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
362      * @param handler the handler to use when execution is blocked
363      *        because the thread bounds and queue capacities are reached
364      * @throws IllegalArgumentException if {@code corePoolSize < 0}
365      * @throws NullPointerException if {@code handler} is null
366      */
367     this(int corePoolSize, RejectedExecutionHandler handler) {
368         super(corePoolSize, int.max,
369               dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE_MILLIS),
370               new DelayedWorkQueue(), handler);
371     }
372 
373     /**
374      * Creates a new {@code ScheduledThreadPoolExecutor} with the
375      * given initial parameters.
376      *
377      * @param corePoolSize the number of threads to keep in the pool, even
378      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
379      * @param threadFactory the factory to use when the executor
380      *        creates a new thread
381      * @param handler the handler to use when execution is blocked
382      *        because the thread bounds and queue capacities are reached
383      * @throws IllegalArgumentException if {@code corePoolSize < 0}
384      * @throws NullPointerException if {@code threadFactory} or
385      *         {@code handler} is null
386      */
387     this(int corePoolSize, ThreadFactory threadFactory,
388                                        RejectedExecutionHandler handler) {
389         super(corePoolSize, int.max,
390               dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE_MILLIS),
391               new DelayedWorkQueue(), threadFactory, handler);
392     }
393 
394     /**
395      * Returns the nanoTime-based trigger time of a delayed action.
396      */
397     private long triggerTime(Duration delay) {
398         return triggerTime(delay.isNegative ? 0 : delay.total!(TimeUnit.HectoNanosecond)());
399     }
400 
401     /**
402      * Returns the nanoTime-based trigger time of a delayed action.
403      */
404     long triggerTime(long delay) {
405         return Clock.currStdTime +
406             ((delay < (long.max >> 1)) ? delay : overflowFree(delay));
407     }
408 
409     /**
410      * Constrains the values of all delays in the queue to be within
411      * long.max of each other, to avoid overflow in compareTo.
412      * This may occur if a task is eligible to be dequeued, but has
413      * not yet been, while some other task is added with a delay of
414      * long.max.
415      */
416     private long overflowFree(long delay) {
417         Delayed head = cast(Delayed) super.getQueue().peek();
418         if (head !is null) {
419             long headDelay = head.getDelay().total!(TimeUnit.HectoNanosecond)();
420             if (headDelay < 0 && (delay - headDelay < 0))
421                 delay = long.max + headDelay;
422         }
423         return delay;
424     }
425 
426     /**
427      * @throws RejectedExecutionException {@inheritDoc}
428      * @throws NullPointerException       {@inheritDoc}
429      */
430     ScheduledFuture!(void) schedule(Runnable command, Duration delay) {
431         if (command is null)
432             throw new NullPointerException();
433         long n = atomicOp!"+="(sequencer, 1);
434         n--;
435         // RunnableScheduledFuture!(void) t = decorateTask(command,
436         //     new ScheduledFutureTask!(void)(command, cast(void)null, triggerTime(delay), n, this));
437         RunnableScheduledFuture!(void) t = decorateTask(command,
438             new ScheduledFutureTask!(void)(command, triggerTime(delay), n, this));        
439         delayedExecute!(void)(t);
440         return t;
441     }
442 
443     /**
444      * @throws RejectedExecutionException {@inheritDoc}
445      * @throws NullPointerException       {@inheritDoc}
446      */
447     ScheduledFuture!(V) schedule(V)(Callable!(V) callable, Duration delay) {
448         if (callable is null)
449             throw new NullPointerException();
450         RunnableScheduledFuture!(V) t = decorateTask(callable,
451             new ScheduledFutureTask!(V)(callable,
452                                        triggerTime(delay),
453                                        cast(long)AtomicHelper.getAndIncrement(sequencer), this));
454         delayedExecute(t);
455         return t;
456     }
457 
458     /**
459      * Submits a periodic action that becomes enabled first after the
460      * given initial delay, and subsequently with the given period;
461      * that is, executions will commence after
462      * {@code initialDelay}, then {@code initialDelay + period}, then
463      * {@code initialDelay + 2 * period}, and so on.
464      *
465      * <p>The sequence of task executions continues indefinitely until
466      * one of the following exceptional completions occur:
467      * <ul>
468      * <li>The task is {@linkplain Future#cancel explicitly cancelled}
469      * via the returned future.
470      * <li>Method {@link #shutdown} is called and the {@linkplain
471      * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on
472      * whether to continue after shutdown} is not set true, or method
473      * {@link #shutdownNow} is called; also resulting in task
474      * cancellation.
475      * <li>An execution of the task throws an exception.  In this case
476      * calling {@link Future#get() get} on the returned future will throw
477      * {@link ExecutionException}, holding the exception as its cause.
478      * </ul>
479      * Subsequent executions are suppressed.  Subsequent calls to
480      * {@link Future#isDone isDone()} on the returned future will
481      * return {@code true}.
482      *
483      * <p>If any execution of this task takes longer than its period, then
484      * subsequent executions may start late, but will not concurrently
485      * execute.
486      *
487      * @throws RejectedExecutionException {@inheritDoc}
488      * @throws NullPointerException       {@inheritDoc}
489      * @throws IllegalArgumentException   {@inheritDoc}
490      */
491     ScheduledFuture!void scheduleAtFixedRate(Runnable command,
492                                                   Duration initialDelay,
493                                                   Duration period) {
494         if (command is null)
495             throw new NullPointerException();
496         if (period <= Duration.zero)
497             throw new IllegalArgumentException();
498 
499         ScheduledFutureTask!(void) sft =
500             new ScheduledFutureTask!(void)(command, // cast(void)null,
501                                           triggerTime(initialDelay),
502                                           period.total!(TimeUnit.HectoNanosecond)(), 
503                                           cast(long)AtomicHelper.getAndIncrement(sequencer), this);        
504         RunnableScheduledFuture!(void) t = decorateTask(command, sft);
505         sft.outerTask = t;
506         delayedExecute(t);
507         return t;
508     }
509 
510     /**
511      * Submits a periodic action that becomes enabled first after the
512      * given initial delay, and subsequently with the given delay
513      * between the termination of one execution and the commencement of
514      * the next.
515      *
516      * <p>The sequence of task executions continues indefinitely until
517      * one of the following exceptional completions occur:
518      * <ul>
519      * <li>The task is {@linkplain Future#cancel explicitly cancelled}
520      * via the returned future.
521      * <li>Method {@link #shutdown} is called and the {@linkplain
522      * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on
523      * whether to continue after shutdown} is not set true, or method
524      * {@link #shutdownNow} is called; also resulting in task
525      * cancellation.
526      * <li>An execution of the task throws an exception.  In this case
527      * calling {@link Future#get() get} on the returned future will throw
528      * {@link ExecutionException}, holding the exception as its cause.
529      * </ul>
530      * Subsequent executions are suppressed.  Subsequent calls to
531      * {@link Future#isDone isDone()} on the returned future will
532      * return {@code true}.
533      *
534      * @throws RejectedExecutionException {@inheritDoc}
535      * @throws NullPointerException       {@inheritDoc}
536      * @throws IllegalArgumentException   {@inheritDoc}
537      */
538     ScheduledFuture!(void) scheduleWithFixedDelay(Runnable command,
539                                                      Duration initialDelay,
540                                                      Duration delay) {
541         if (command is null)
542             throw new NullPointerException();
543         if (delay <= Duration.zero)
544             throw new IllegalArgumentException();
545         ScheduledFutureTask!(void) sft =
546             new ScheduledFutureTask!(void)(command, // cast(void)null,
547                                           triggerTime(initialDelay),
548                                           -delay.total!(TimeUnit.HectoNanosecond)(),
549                                           cast(long)AtomicHelper.getAndIncrement(sequencer), this);
550         RunnableScheduledFuture!(void) t = decorateTask(command, sft);
551         sft.outerTask = t;
552         delayedExecute(t);
553         return t;
554     }
555 
556     /**
557      * Executes {@code command} with zero required delay.
558      * This has effect equivalent to
559      * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
560      * Note that inspections of the queue and of the list returned by
561      * {@code shutdownNow} will access the zero-delayed
562      * {@link ScheduledFuture}, not the {@code command} itself.
563      *
564      * <p>A consequence of the use of {@code ScheduledFuture} objects is
565      * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
566      * called with a null second {@code Throwable} argument, even if the
567      * {@code command} terminated abruptly.  Instead, the {@code Throwable}
568      * thrown by such a task can be obtained via {@link Future#get}.
569      *
570      * @throws RejectedExecutionException at discretion of
571      *         {@code RejectedExecutionHandler}, if the task
572      *         cannot be accepted for execution because the
573      *         executor has been shut down
574      * @throws NullPointerException {@inheritDoc}
575      */
576     override void execute(Runnable command) {
577         schedule(command, Duration.zero);
578     }
579 
580     // Override AbstractExecutorService methods
581 
582     /**
583      * @throws RejectedExecutionException {@inheritDoc}
584      * @throws NullPointerException       {@inheritDoc}
585      */
586     override Future!void submit(Runnable task) {
587         return schedule(task, Duration.zero);
588     }
589 
590     /**
591      * @throws RejectedExecutionException {@inheritDoc}
592      * @throws NullPointerException       {@inheritDoc}
593      */
594     Future!(T) submit(T)(Runnable task, T result) {
595         return schedule(Executors.callable(task, result), Duration.zero);
596     }
597 
598     /**
599      * @throws RejectedExecutionException {@inheritDoc}
600      * @throws NullPointerException       {@inheritDoc}
601      */
602     Future!(T) submit(T)(Callable!(T) task) {
603         return schedule(task, Duration.zero);
604     }
605 
606     /**
607      * Sets the policy on whether to continue executing existing
608      * periodic tasks even when this executor has been {@code shutdown}.
609      * In this case, executions will continue until {@code shutdownNow}
610      * or the policy is set to {@code false} when already shutdown.
611      * This value is by default {@code false}.
612      *
613      * @param value if {@code true}, continue after shutdown, else don't
614      * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
615      */
616     void setContinueExistingPeriodicTasksAfterShutdownPolicy(bool value) {
617         continueExistingPeriodicTasksAfterShutdown = value;
618         if (!value && isShutdown())
619             onShutdown();
620     }
621 
622     /**
623      * Gets the policy on whether to continue executing existing
624      * periodic tasks even when this executor has been {@code shutdown}.
625      * In this case, executions will continue until {@code shutdownNow}
626      * or the policy is set to {@code false} when already shutdown.
627      * This value is by default {@code false}.
628      *
629      * @return {@code true} if will continue after shutdown
630      * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
631      */
632     bool getContinueExistingPeriodicTasksAfterShutdownPolicy() {
633         return continueExistingPeriodicTasksAfterShutdown;
634     }
635 
636     /**
637      * Sets the policy on whether to execute existing delayed
638      * tasks even when this executor has been {@code shutdown}.
639      * In this case, these tasks will only terminate upon
640      * {@code shutdownNow}, or after setting the policy to
641      * {@code false} when already shutdown.
642      * This value is by default {@code true}.
643      *
644      * @param value if {@code true}, execute after shutdown, else don't
645      * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
646      */
647     void setExecuteExistingDelayedTasksAfterShutdownPolicy(bool value) {
648         executeExistingDelayedTasksAfterShutdown = value;
649         if (!value && isShutdown())
650             onShutdown();
651     }
652 
653     /**
654      * Gets the policy on whether to execute existing delayed
655      * tasks even when this executor has been {@code shutdown}.
656      * In this case, these tasks will only terminate upon
657      * {@code shutdownNow}, or after setting the policy to
658      * {@code false} when already shutdown.
659      * This value is by default {@code true}.
660      *
661      * @return {@code true} if will execute after shutdown
662      * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
663      */
664     bool getExecuteExistingDelayedTasksAfterShutdownPolicy() {
665         return executeExistingDelayedTasksAfterShutdown;
666     }
667 
668     /**
669      * Sets the policy on whether cancelled tasks should be immediately
670      * removed from the work queue at time of cancellation.  This value is
671      * by default {@code false}.
672      *
673      * @param value if {@code true}, remove on cancellation, else don't
674      * @see #getRemoveOnCancelPolicy
675      */
676     void setRemoveOnCancelPolicy(bool value) {
677         removeOnCancel = value;
678     }
679 
680     /**
681      * Gets the policy on whether cancelled tasks should be immediately
682      * removed from the work queue at time of cancellation.  This value is
683      * by default {@code false}.
684      *
685      * @return {@code true} if cancelled tasks are immediately removed
686      *         from the queue
687      * @see #setRemoveOnCancelPolicy
688      */
689     bool getRemoveOnCancelPolicy() {
690         return removeOnCancel;
691     }
692 
693     /**
694      * Initiates an orderly shutdown in which previously submitted
695      * tasks are executed, but no new tasks will be accepted.
696      * Invocation has no additional effect if already shut down.
697      *
698      * <p>This method does not wait for previously submitted tasks to
699      * complete execution.  Use {@link #awaitTermination awaitTermination}
700      * to do that.
701      *
702      * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
703      * has been set {@code false}, existing delayed tasks whose delays
704      * have not yet elapsed are cancelled.  And unless the {@code
705      * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
706      * {@code true}, future executions of existing periodic tasks will
707      * be cancelled.
708      *
709      * @throws SecurityException {@inheritDoc}
710      */
711     override void shutdown() {
712         super.shutdown();
713     }
714 
715     /**
716      * Attempts to stop all actively executing tasks, halts the
717      * processing of waiting tasks, and returns a list of the tasks
718      * that were awaiting execution. These tasks are drained (removed)
719      * from the task queue upon return from this method.
720      *
721      * <p>This method does not wait for actively executing tasks to
722      * terminate.  Use {@link #awaitTermination awaitTermination} to
723      * do that.
724      *
725      * <p>There are no guarantees beyond best-effort attempts to stop
726      * processing actively executing tasks.  This implementation
727      * interrupts tasks via {@link Thread#interrupt}; any task that
728      * fails to respond to interrupts may never terminate.
729      *
730      * @return list of tasks that never commenced execution.
731      *         Each element of this list is a {@link ScheduledFuture}.
732      *         For tasks submitted via one of the {@code schedule}
733      *         methods, the element will be identical to the returned
734      *         {@code ScheduledFuture}.  For tasks submitted using
735      *         {@link #execute execute}, the element will be a
736      *         zero-delay {@code ScheduledFuture}.
737      * @throws SecurityException {@inheritDoc}
738      */
739     override List!(Runnable) shutdownNow() {
740         return super.shutdownNow();
741     }
742 
743     /**
744      * Returns the task queue used by this executor.  Access to the
745      * task queue is intended primarily for debugging and monitoring.
746      * This queue may be in active use.  Retrieving the task queue
747      * does not prevent queued tasks from executing.
748      *
749      * <p>Each element of this queue is a {@link ScheduledFuture}.
750      * For tasks submitted via one of the {@code schedule} methods, the
751      * element will be identical to the returned {@code ScheduledFuture}.
752      * For tasks submitted using {@link #execute execute}, the element
753      * will be a zero-delay {@code ScheduledFuture}.
754      *
755      * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
756      * tasks in the order in which they will execute.
757      *
758      * @return the task queue
759      */
760     override BlockingQueue!(Runnable) getQueue() {
761         return super.getQueue();
762     }
763 }
764 
765 
766 /**
767 */
768 private class ScheduledFutureTask(V) : FutureTask!(V) , 
769     RunnableScheduledFuture!(V), IScheduledFutureTask {
770 
771     /** Sequence number to break ties FIFO */
772     private long sequenceNumber;
773 
774     /** The nanoTime-based time when the task is enabled to execute. */
775     private long time;
776 
777     /**
778      * Period for repeating tasks, in nanoseconds.
779      * A positive value indicates fixed-rate execution.
780      * A negative value indicates fixed-delay execution.
781      * A value of 0 indicates a non-repeating (one-shot) task.
782      */
783     private long period;
784 
785     /** The actual task to be re-enqueued by reExecutePeriodic */
786     RunnableScheduledFuture!(V) outerTask; // = this;
787     ScheduledThreadPoolExecutor poolExecutor;
788 
789     /**
790      * Index into delay queue, to support faster cancellation.
791      */
792     int _heapIndex;
793 
794 static if(is(V == void)) {         
795     this(Runnable r, long triggerTime,
796                         long sequenceNumber, ScheduledThreadPoolExecutor poolExecutor) {
797         super(r);
798         this.time = triggerTime;
799         this.period = 0;
800         this.sequenceNumber = sequenceNumber;
801         this.poolExecutor = poolExecutor;
802         initializeMembers();
803     }        
804 
805     /**
806      * Creates a periodic action with given nanoTime-based initial
807      * trigger time and period.
808      */
809     this(Runnable r, long triggerTime,
810                         long period, long sequenceNumber, ScheduledThreadPoolExecutor poolExecutor) {
811         super(r);
812         this.time = triggerTime;
813         this.period = period;
814         this.sequenceNumber = sequenceNumber;
815         this.poolExecutor = poolExecutor;
816         initializeMembers();
817     }
818 } else {
819 
820     /**
821      * Creates a one-shot action with given nanoTime-based trigger time.
822      */
823     this(Runnable r, V result, long triggerTime,
824                         long sequenceNumber, ScheduledThreadPoolExecutor poolExecutor) {
825         super(r, result);
826         this.time = triggerTime;
827         this.period = 0;
828         this.sequenceNumber = sequenceNumber;
829         this.poolExecutor = poolExecutor;
830         initializeMembers();
831     }           
832 
833     /**
834      * Creates a periodic action with given nanoTime-based initial
835      * trigger time and period.
836      */
837     this(Runnable r, V result, long triggerTime,
838                         long period, long sequenceNumber, ScheduledThreadPoolExecutor poolExecutor) {
839         super(r, result);
840         this.time = triggerTime;
841         this.period = period;
842         this.sequenceNumber = sequenceNumber;
843         this.poolExecutor = poolExecutor;
844         initializeMembers();
845     } 
846 }
847 
848     /**
849      * Creates a one-shot action with given nanoTime-based trigger time.
850      */
851     this(Callable!(V) callable, long triggerTime,
852                         long sequenceNumber, ScheduledThreadPoolExecutor poolExecutor) {
853         super(callable);
854         this.time = triggerTime;
855         this.period = 0;
856         this.sequenceNumber = sequenceNumber;
857         this.poolExecutor = poolExecutor;
858         initializeMembers();
859     }
860 
861     private void initializeMembers() {
862         outerTask = this;
863     }
864     
865     void heapIndex(int index) {
866         _heapIndex = index;
867     }
868 
869     int heapIndex() {
870         return _heapIndex;
871     }
872 
873     Duration getDelay() {
874         return dur!(TimeUnit.HectoNanosecond)(time - Clock.currStdTime()); 
875     }
876 
877     int opCmp(Delayed other) {
878         if (other == this) // compare zero if same object
879             return 0;
880         ScheduledFutureTask!V x = cast(ScheduledFutureTask!V)other;
881         if (x !is null) {
882             long diff = time - x.time;
883             if (diff < 0)
884                 return -1;
885             else if (diff > 0)
886                 return 1;
887             else if (sequenceNumber < x.sequenceNumber)
888                 return -1;
889             else
890                 return 1;
891         }
892         Duration diff = getDelay() - other.getDelay();
893         return (diff.isNegative) ? -1 : (diff > Duration.zero) ? 1 : 0;
894     }
895 
896     /**
897      * Returns {@code true} if this is a periodic (not a one-shot) action.
898      *
899      * @return {@code true} if periodic
900      */
901     bool isPeriodic() {
902         return period != 0;
903     }
904 
905     /**
906      * Sets the next time to run for a periodic task.
907      */
908     private void setNextRunTime() {
909         long p = period;
910         if (p > 0)
911             time += p;
912         else
913             time = poolExecutor.triggerTime(-p);
914     }
915 
916     override bool cancel(bool mayInterruptIfRunning) {
917         // The racy read of heapIndex below is benign:
918         // if heapIndex < 0, then OOTA guarantees that we have surely
919         // been removed; else we recheck under lock in remove()
920         bool cancelled = super.cancel(mayInterruptIfRunning);
921         if (cancelled && poolExecutor.removeOnCancel && heapIndex >= 0)
922             poolExecutor.remove(this);
923         return cancelled;
924     }
925 
926     /**
927      * Overrides FutureTask version so as to reset/requeue if periodic.
928      */
929     override void run() {
930         if (!poolExecutor.canRunInCurrentRunState(this))
931             cancel(false);
932         else if (!isPeriodic())
933             super.run();
934         else if (super.runAndReset()) {
935             setNextRunTime();
936             poolExecutor.reExecutePeriodic(outerTask);
937         }
938     }
939 
940     // alias from FutureTask
941     // alias isCancelled = FutureTask!V.isCancelled;
942     // alias isDone = FutureTask!V.isDone;
943     alias get = FutureTask!V.get;
944     
945     override bool isCancelled() {
946         return super.isCancelled();
947     }
948 
949     override bool isDone() {
950         return super.isDone();
951     }
952 
953     override V get() {
954         return super.get();
955     }
956 
957     override V get(Duration timeout) {
958         return super.get(timeout);
959     }
960 }
961 
962 
963 /**
964  * Specialized delay queue. To mesh with TPE declarations, this
965  * class must be declared as a BlockingQueue!(Runnable) even though
966  * it can only hold RunnableScheduledFutures.
967  */
968 class DelayedWorkQueue : AbstractQueue!(Runnable), BlockingQueue!(Runnable) {
969 
970     /*
971      * A DelayedWorkQueue is based on a heap-based data structure
972      * like those in DelayQueue and PriorityQueue, except that
973      * every ScheduledFutureTask also records its index into the
974      * heap array. This eliminates the need to find a task upon
975      * cancellation, greatly speeding up removal (down from O(n)
976      * to O(log n)), and reducing garbage retention that would
977      * otherwise occur by waiting for the element to rise to top
978      * before clearing. But because the queue may also hold
979      * RunnableScheduledFutures that are not ScheduledFutureTasks,
980      * we are not guaranteed to have such indices available, in
981      * which case we fall back to linear search. (We expect that
982      * most tasks will not be decorated, and that the faster cases
983      * will be much more common.)
984      *
985      * All heap operations must record index changes -- mainly
986      * within siftUp and siftDown. Upon removal, a task's
987      * heapIndex is set to -1. Note that ScheduledFutureTasks can
988      * appear at most once in the queue (this need not be true for
989      * other kinds of tasks or work queues), so are uniquely
990      * identified by heapIndex.
991      */
992 
993     private enum int INITIAL_CAPACITY = 16;
994     private IRunnableScheduledFuture[] queue;
995     private ReentrantLock lock;
996     private int _size;
997 
998     /**
999      * Thread designated to wait for the task at the head of the
1000      * queue.  This variant of the Leader-Follower pattern
1001      * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
1002      * minimize unnecessary timed waiting.  When a thread becomes
1003      * the leader, it waits only for the next delay to elapse, but
1004      * other threads await indefinitely.  The leader thread must
1005      * signal some other thread before returning from take() or
1006      * poll(...), unless some other thread becomes leader in the
1007      * interim.  Whenever the head of the queue is replaced with a
1008      * task with an earlier expiration time, the leader field is
1009      * invalidated by being reset to null, and some waiting
1010      * thread, but not necessarily the current leader, is
1011      * signalled.  So waiting threads must be prepared to acquire
1012      * and lose leadership while waiting.
1013      */
1014     private ThreadEx leader;
1015 
1016     /**
1017      * Condition signalled when a newer task becomes available at the
1018      * head of the queue or a new thread may need to become leader.
1019      */
1020     private Condition available;
1021 
1022     this() {
1023         initializeMembers();
1024     }
1025 
1026     private void initializeMembers() {
1027         lock = new ReentrantLock();
1028         available = new Condition(lock);
1029         queue = new IRunnableScheduledFuture[INITIAL_CAPACITY];
1030     }
1031 
1032     /**
1033      * Sets f's heapIndex if it is a ScheduledFutureTask.
1034      */
1035     private static void setIndex(IRunnableScheduledFuture f, int idx) {
1036         IScheduledFutureTask t = cast(IScheduledFutureTask)f;
1037         // tracef("index=%d, type: %s", idx, typeid(cast(Object)t));
1038         if (t !is null)
1039             t.heapIndex = idx;
1040     }
1041 
1042     /**
1043      * Sifts element added at bottom up to its heap-ordered spot.
1044      * Call only when holding lock.
1045      */
1046     private void siftUp(int k, IRunnableScheduledFuture key) {
1047         while (k > 0) {
1048             int parent = (k - 1) >>> 1;
1049             IRunnableScheduledFuture e = queue[parent];
1050             if (key >= e)
1051                 break;
1052             queue[k] = e;
1053             setIndex(e, k);
1054             k = parent;
1055         }
1056         // tracef("k=%d, key is null: %s", k, key is null);
1057         queue[k] = key;
1058         setIndex(key, k);
1059     }
1060 
1061     /**
1062      * Sifts element added at top down to its heap-ordered spot.
1063      * Call only when holding lock.
1064      */
1065     private void siftDown(int k, IRunnableScheduledFuture key) {
1066         int half = size >>> 1;
1067         while (k < half) {
1068             int child = (k << 1) + 1;
1069             IRunnableScheduledFuture c = queue[child];
1070             int right = child + 1;
1071             if (right < size && c.opCmp(queue[right]) > 0)
1072                 c = queue[child = right];
1073             if (key.opCmp(c) <= 0)
1074                 break;
1075             queue[k] = c;
1076             setIndex(c, k);
1077             k = child;
1078         }
1079         queue[k] = key;
1080         setIndex(key, k);
1081     }
1082 
1083     /**
1084      * Resizes the heap array.  Call only when holding lock.
1085      */
1086     private void grow() {
1087         size_t oldCapacity = queue.length;
1088         size_t newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
1089         if (newCapacity < 0) // overflow
1090             newCapacity = int.max;
1091         queue.length = newCapacity;
1092     }
1093 
1094     /**
1095      * Finds index of given object, or -1 if absent.
1096      */
1097     private int indexOf(Runnable x) {
1098         if (x !is null) {
1099             IScheduledFutureTask sf = cast(IScheduledFutureTask) x;
1100             if (sf !is null) {
1101                 int i = sf.heapIndex;
1102                 // Sanity check; x could conceivably be a
1103                 // ScheduledFutureTask from some other pool.
1104                 if (i >= 0 && i < size && queue[i] == x)
1105                     return i;
1106             } else {
1107                 for (int i = 0; i < size; i++) {
1108                     // if (x.opEquals(cast(Object)queue[i]))
1109                     if(x is queue[i])
1110                         return i;
1111                 }
1112             }
1113         }
1114         return -1;
1115     }
1116 
1117     override bool contains(Runnable x) {
1118         ReentrantLock lock = this.lock;
1119         lock.lock();
1120         try {
1121             return indexOf(x) != -1;
1122         } finally {
1123             lock.unlock();
1124         }
1125     }
1126 
1127     override bool remove(Runnable x) {
1128         ReentrantLock lock = this.lock;
1129         trace(cast(Object)x);
1130         lock.lock();
1131         try {
1132             int i = indexOf(x);
1133             if (i < 0)
1134                 return false;
1135 
1136             setIndex(queue[i], -1);
1137             int s = --_size;
1138             IRunnableScheduledFuture replacement = queue[s];
1139             queue[s] = null;
1140             if (s != i) {
1141                 siftDown(i, replacement);
1142                 if (queue[i] == replacement)
1143                     siftUp(i, replacement);
1144             }
1145             return true;
1146         } finally {
1147             lock.unlock();
1148         }
1149     }
1150 
1151     override int size() {
1152         // return _size;
1153         ReentrantLock lock = this.lock;
1154         lock.lock();
1155         try {
1156             return _size;
1157         } finally {
1158             lock.unlock();
1159         }
1160     }
1161 
1162     override bool isEmpty() {
1163         return size() == 0;
1164     }
1165 
1166     int remainingCapacity() {
1167         return int.max;
1168     }
1169 
1170     IRunnableScheduledFuture peek() {
1171         ReentrantLock lock = this.lock;
1172         lock.lock();
1173         try {
1174             return queue[0];
1175         } finally {
1176             lock.unlock();
1177         }
1178     }
1179 
1180     bool offer(Runnable x) {
1181         if (x is null)
1182             throw new NullPointerException();
1183         IRunnableScheduledFuture e = cast(IRunnableScheduledFuture)x;
1184         ReentrantLock lock = this.lock;
1185         lock.lock();
1186         try {
1187             int i = _size;
1188             if (i >= queue.length)
1189                 grow();
1190             _size = i + 1;
1191             if (i == 0) {
1192                 queue[0] = e;
1193                 setIndex(e, 0);
1194             } else {
1195                 siftUp(i, e);
1196             }
1197             if (queue[0] == e) {
1198                 leader = null;
1199                 available.notify();
1200             }
1201         } finally {
1202             lock.unlock();
1203         }
1204         return true;
1205     }
1206 
1207     override void put(Runnable e) {
1208         offer(e);
1209     }
1210 
1211     override bool add(Runnable e) {
1212         return offer(e);
1213     }
1214 
1215     bool offer(Runnable e, Duration timeout) {
1216         return offer(e);
1217     }
1218 
1219     /**
1220      * Performs common bookkeeping for poll and take: Replaces
1221      * first element with last and sifts it down.  Call only when
1222      * holding lock.
1223      * @param f the task to remove and return
1224      */
1225     private IRunnableScheduledFuture finishPoll(IRunnableScheduledFuture f) {
1226         int s = --_size;
1227         IRunnableScheduledFuture x = queue[s];
1228         queue[s] = null;
1229         if (s != 0)
1230             siftDown(0, x);
1231         setIndex(f, -1);
1232         return f;
1233     }
1234 
1235     IRunnableScheduledFuture poll() {
1236         ReentrantLock lock = this.lock;
1237         lock.lock();
1238         try {
1239             IRunnableScheduledFuture first = queue[0];
1240             return (first is null || first.getDelay() > Duration.zero)
1241                 ? null
1242                 : finishPoll(first);
1243         } finally {
1244             lock.unlock();
1245         }
1246     }
1247 
1248     IRunnableScheduledFuture take() {
1249         ReentrantLock lock = this.lock;
1250         // lock.lockInterruptibly();
1251         lock.lock();
1252         try {
1253             for (;;) {
1254                 IRunnableScheduledFuture first = queue[0];
1255                 if (first is null)
1256                     available.wait();
1257                 else {
1258                     Duration delay = first.getDelay();
1259                     if (delay <= Duration.zero)
1260                         return finishPoll(first);
1261                     first = null; // don't retain ref while waiting
1262                     if (leader !is null)
1263                         available.wait();
1264                     else {
1265                         ThreadEx thisThread = ThreadEx.currentThread();
1266                         leader = thisThread;
1267                         try {
1268                             available.wait(delay);
1269                         } finally {
1270                             if (leader == thisThread)
1271                                 leader = null;
1272                         }
1273                     }
1274                 }
1275             }
1276         } finally {
1277             if (leader is null && queue[0] !is null)
1278                 available.notify();
1279             lock.unlock();
1280         }
1281     }
1282 
1283     IRunnableScheduledFuture poll(Duration timeout) {
1284         // long nanos = total!(TimeUnit.HectoNanosecond)(timeout);
1285         Duration nanos = timeout;
1286         ReentrantLock lock = this.lock;
1287         // lock.lockInterruptibly();
1288         lock.lock();
1289         try {
1290             for (;;) {
1291                 IRunnableScheduledFuture first = queue[0];
1292                 if (first is null) {
1293                     if (nanos <= Duration.zero)
1294                         return null;
1295                     else
1296                         available.wait(nanos); // nanos = 
1297                 } else {
1298                     Duration delay = first.getDelay();
1299                     if (delay <= Duration.zero)
1300                         return finishPoll(first);
1301                     if (nanos <= Duration.zero)
1302                         return null;
1303                     first = null; // don't retain ref while waiting
1304                     if (nanos < delay || leader !is null)
1305                         available.wait(nanos); // nanos = 
1306                     else {
1307                         ThreadEx thisThread = ThreadEx.currentThread();
1308                         leader = thisThread;
1309                         try {
1310                             available.wait(delay);
1311                             nanos -= delay;
1312                             // long timeLeft = available.wait(delay);
1313                             // nanos -= delay - timeLeft;
1314                         } finally {
1315                             if (leader == thisThread)
1316                                 leader = null;
1317                         }
1318                     }
1319                 }
1320             }
1321         } finally {
1322             if (leader is null && queue[0] !is null)
1323                 available.notify();
1324             lock.unlock();
1325         }
1326     }
1327 
1328     override void clear() {
1329         ReentrantLock lock = this.lock;
1330         lock.lock();
1331         try {
1332             for (int i = 0; i < size; i++) {
1333                 IRunnableScheduledFuture t = queue[i];
1334                 if (t !is null) {
1335                     queue[i] = null;
1336                     setIndex(t, -1);
1337                 }
1338             }
1339             _size = 0;
1340         } finally {
1341             lock.unlock();
1342         }
1343     }
1344 
1345     int drainTo(Collection!(Runnable) c) {
1346         return drainTo(c, int.max);
1347     }
1348 
1349     int drainTo(Collection!(Runnable) c, int maxElements) {
1350         // Objects.requireNonNull(c);
1351 
1352         if (c == this)
1353             throw new IllegalArgumentException();
1354         if (maxElements <= 0)
1355             return 0;
1356         ReentrantLock lock = this.lock;
1357         lock.lock();
1358         try {
1359             int n = 0;
1360             for (IRunnableScheduledFuture first;
1361                  n < maxElements
1362                      && (first = queue[0]) !is null
1363                      && first.getDelay() <= Duration.zero;) {
1364                 c.add(first);   // In this order, in case add() throws.
1365                 finishPoll(first);
1366                 ++n;
1367             }
1368             return n;
1369         } finally {
1370             lock.unlock();
1371         }
1372     }
1373 
1374     override Runnable[] toArray() {
1375         ReentrantLock lock = this.lock;
1376         lock.lock();
1377         try {
1378             Runnable[] r = new Runnable[_size];
1379             for(int i=0; i<_size; i++) {
1380                 r[i] = queue[i];
1381             }
1382             return r;
1383 
1384         } finally {
1385             lock.unlock();
1386         }
1387     }
1388 
1389     override int opApply(scope int delegate(ref Runnable) dg) {
1390        if(dg is null)
1391             throw new NullPointerException();
1392         ReentrantLock lock = this.lock;
1393         lock.lock();
1394         scope(exit) lock.unlock();
1395 
1396         int result = 0;
1397         foreach(int i; 0.._size) {
1398             Runnable v = queue[i];
1399             result = dg(v);
1400             if(result != 0) return result;
1401         }
1402         return result;
1403     }
1404 
1405 
1406     // Iterator!(Runnable) iterator() {
1407     //     ReentrantLock lock = this.lock;
1408     //     lock.lock();
1409     //     try {
1410     //         return new Itr(Arrays.copyOf(queue, size));
1411     //     } finally {
1412     //         lock.unlock();
1413     //     }
1414     // }
1415 
1416     /**
1417      * Snapshot iterator that works off copy of underlying q array.
1418      */
1419     // private class Itr : Iterator!(Runnable) {
1420     //     final IRunnableScheduledFuture[] array;
1421     //     int cursor;        // index of next element to return; initially 0
1422     //     int lastRet = -1;  // index of last element returned; -1 if no such
1423 
1424     //     this(IRunnableScheduledFuture[] array) {
1425     //         this.array = array;
1426     //     }
1427 
1428     //     bool hasNext() {
1429     //         return cursor < array.length;
1430     //     }
1431 
1432     //     Runnable next() {
1433     //         if (cursor >= array.length)
1434     //             throw new NoSuchElementException();
1435     //         return array[lastRet = cursor++];
1436     //     }
1437 
1438     //     void remove() {
1439     //         if (lastRet < 0)
1440     //             throw new IllegalStateException();
1441     //         DelayedWorkQueue.this.remove(array[lastRet]);
1442     //         lastRet = -1;
1443     //     }
1444     // }
1445 
1446     override bool opEquals(IObject o) {
1447         return opEquals(cast(Object) o);
1448     }
1449 
1450     override bool opEquals(Object o) {
1451         return super.opEquals(o);
1452     }
1453     
1454     override string toString() {
1455         return super.toString();
1456     }
1457 
1458     override size_t toHash() @trusted nothrow {
1459         return super.toHash();
1460     }
1461 }