1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.concurrency.Executors;
13 
14 import hunt.concurrency.AbstractExecutorService;
15 import hunt.concurrency.atomic.AtomicHelper;
16 import hunt.concurrency.Delayed;
17 import hunt.concurrency.Exceptions;
18 import hunt.concurrency.ExecutorService;
19 // import hunt.concurrency.ForkJoinPool;
20 import hunt.concurrency.Future;
21 import hunt.concurrency.LinkedBlockingQueue;
22 import hunt.concurrency.ScheduledExecutorService;
23 import hunt.concurrency.ScheduledThreadPoolExecutor;
24 import hunt.concurrency.ThreadFactory;
25 import hunt.concurrency.ThreadPoolExecutor;
26 
27 import hunt.collection.List;
28 import hunt.Exceptions;
29 import hunt.logging.ConsoleLogger;
30 import hunt.util.Common;
31 import hunt.util.CompilerHelper;
32 import hunt.util.DateTime;
33 import hunt.util.Runnable;
34 
35 
36 static if(CompilerHelper.isGreaterThan(2093)) {
37     import core.thread.osthread;
38 } else {
39     import core.thread;
40 }
41 
42 import core.time;
43 import std.conv;
44 
45 /**
46  * Factory and utility methods for {@link Executor}, {@link
47  * ExecutorService}, {@link ScheduledExecutorService}, {@link
48  * ThreadFactory}, and {@link Callable} classes defined in this
49  * package. This class supports the following kinds of methods:
50  *
51  * <ul>
52  *   <li>Methods that create and return an {@link ExecutorService}
53  *       set up with commonly useful configuration settings.
54  *   <li>Methods that create and return a {@link ScheduledExecutorService}
55  *       set up with commonly useful configuration settings.
56  *   <li>Methods that create and return a "wrapped" ExecutorService, that
57  *       disables reconfiguration by making implementation-specific methods
58  *       inaccessible.
59  *   <li>Methods that create and return a {@link ThreadFactory}
60  *       that sets newly created threads to a known state.
61  *   <li>Methods that create and return a {@link Callable}
62  *       out of other closure-like forms, so they can be used
63  *       in execution methods requiring {@code Callable}.
64  * </ul>
65  *
66  * @author Doug Lea
67  */
68 class Executors {
69 
70     /**
71      * Creates a thread pool that reuses a fixed number of threads
72      * operating off a shared unbounded queue.  At any point, at most
73      * {@code nThreads} threads will be active processing tasks.
74      * If additional tasks are submitted when all threads are active,
75      * they will wait in the queue until a thread is available.
76      * If any thread terminates due to a failure during execution
77      * prior to shutdown, a new one will take its place if needed to
78      * execute subsequent tasks.  The threads in the pool will exist
79      * until it is explicitly {@link ExecutorService#shutdown shutdown}.
80      *
81      * @param nThreads the number of threads in the pool
82      * @return the newly created thread pool
83      * @throws IllegalArgumentException if {@code nThreads <= 0}
84      */
85     static ThreadPoolExecutor newFixedThreadPool(int nThreads) {
86         return new ThreadPoolExecutor(nThreads, nThreads, 0.hnsecs,
87                                       new LinkedBlockingQueue!(Runnable)());
88     }
89 
90     // /**
91     //  * Creates a thread pool that maintains enough threads to support
92     //  * the given parallelism level, and may use multiple queues to
93     //  * reduce contention. The parallelism level corresponds to the
94     //  * maximum number of threads actively engaged in, or available to
95     //  * engage in, task processing. The actual number of threads may
96     //  * grow and shrink dynamically. A work-stealing pool makes no
97     //  * guarantees about the order in which submitted tasks are
98     //  * executed.
99     //  *
100     //  * @param parallelism the targeted parallelism level
101     //  * @return the newly created thread pool
102     //  * @throws IllegalArgumentException if {@code parallelism <= 0}
103     //  */
104     // static ExecutorService newWorkStealingPool(int parallelism) {
105     //     return new ForkJoinPool
106     //         (parallelism,
107     //          ForkJoinPool.defaultForkJoinWorkerThreadFactory,
108     //          null, true);
109     // }
110 
111     // /**
112     //  * Creates a work-stealing thread pool using the number of
113     //  * {@linkplain Runtime#availableProcessors available processors}
114     //  * as its target parallelism level.
115     //  *
116     //  * @return the newly created thread pool
117     //  * @see #newWorkStealingPool(int)
118     //  */
119     // static ExecutorService newWorkStealingPool() {
120     //     return new ForkJoinPool
121     //         (Runtime.getRuntime().availableProcessors(),
122     //          ForkJoinPool.defaultForkJoinWorkerThreadFactory,
123     //          null, true);
124     // }
125 
126     /**
127      * Creates a thread pool that reuses a fixed number of threads
128      * operating off a shared unbounded queue, using the provided
129      * ThreadFactory to create new threads when needed.  At any point,
130      * at most {@code nThreads} threads will be active processing
131      * tasks.  If additional tasks are submitted when all threads are
132      * active, they will wait in the queue until a thread is
133      * available.  If any thread terminates due to a failure during
134      * execution prior to shutdown, a new one will take its place if
135      * needed to execute subsequent tasks.  The threads in the pool will
136      * exist until it is explicitly {@link ExecutorService#shutdown
137      * shutdown}.
138      *
139      * @param nThreads the number of threads in the pool
140      * @param threadFactory the factory to use when creating new threads
141      * @return the newly created thread pool
142      * @throws NullPointerException if threadFactory is null
143      * @throws IllegalArgumentException if {@code nThreads <= 0}
144      */
145     static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
146         return new ThreadPoolExecutor(nThreads, nThreads, 0.msecs,
147                                       new LinkedBlockingQueue!(Runnable)(),
148                                       threadFactory);
149     }
150 
151     /**
152      * Creates an Executor that uses a single worker thread operating
153      * off an unbounded queue. (Note however that if this single
154      * thread terminates due to a failure during execution prior to
155      * shutdown, a new one will take its place if needed to execute
156      * subsequent tasks.)  Tasks are guaranteed to execute
157      * sequentially, and no more than one task will be active at any
158      * given time. Unlike the otherwise equivalent
159      * {@code newFixedThreadPool(1)} the returned executor is
160      * guaranteed not to be reconfigurable to use additional threads.
161      *
162      * @return the newly created single-threaded Executor
163      */
164     // static ExecutorService newSingleThreadExecutor() {
165     //     return new FinalizableDelegatedExecutorService
166     //         (new ThreadPoolExecutor(1, 1,
167     //                                 0L, TimeUnit.MILLISECONDS,
168     //                                 new LinkedBlockingQueue!(Runnable)()));
169     // }
170 
171     // /**
172     //  * Creates an Executor that uses a single worker thread operating
173     //  * off an unbounded queue, and uses the provided ThreadFactory to
174     //  * create a new thread when needed. Unlike the otherwise
175     //  * equivalent {@code newFixedThreadPool(1, threadFactory)} the
176     //  * returned executor is guaranteed not to be reconfigurable to use
177     //  * additional threads.
178     //  *
179     //  * @param threadFactory the factory to use when creating new threads
180     //  * @return the newly created single-threaded Executor
181     //  * @throws NullPointerException if threadFactory is null
182     //  */
183     // static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
184     //     return new FinalizableDelegatedExecutorService
185     //         (new ThreadPoolExecutor(1, 1,
186     //                                 0L, TimeUnit.MILLISECONDS,
187     //                                 new LinkedBlockingQueue!(Runnable)(),
188     //                                 threadFactory));
189     // }
190 
191     /**
192      * Creates a thread pool that creates new threads as needed, but
193      * will reuse previously constructed threads when they are
194      * available.  These pools will typically improve the performance
195      * of programs that execute many short-lived asynchronous tasks.
196      * Calls to {@code execute} will reuse previously constructed
197      * threads if available. If no existing thread is available, a new
198      * thread will be created and added to the pool. Threads that have
199      * not been used for sixty seconds are terminated and removed from
200      * the cache. Thus, a pool that remains idle for long enough will
201      * not consume any resources. Note that pools with similar
202      * properties but different details (for example, timeout parameters)
203      * may be created using {@link ThreadPoolExecutor} constructors.
204      *
205      * @return the newly created thread pool
206      */
207     // static ExecutorService newCachedThreadPool() {
208     //     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
209     //                                   60L, TimeUnit.SECONDS,
210     //                                   new SynchronousQueue!(Runnable)());
211     // }
212 
213     // /**
214     //  * Creates a thread pool that creates new threads as needed, but
215     //  * will reuse previously constructed threads when they are
216     //  * available, and uses the provided
217     //  * ThreadFactory to create new threads when needed.
218     //  *
219     //  * @param threadFactory the factory to use when creating new threads
220     //  * @return the newly created thread pool
221     //  * @throws NullPointerException if threadFactory is null
222     //  */
223     // static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
224     //     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
225     //                                   60L, TimeUnit.SECONDS,
226     //                                   new SynchronousQueue!(Runnable)(),
227     //                                   threadFactory);
228     // }
229 
230     /**
231      * Creates a single-threaded executor that can schedule commands
232      * to run after a given delay, or to execute periodically.
233      * (Note however that if this single
234      * thread terminates due to a failure during execution prior to
235      * shutdown, a new one will take its place if needed to execute
236      * subsequent tasks.)  Tasks are guaranteed to execute
237      * sequentially, and no more than one task will be active at any
238      * given time. Unlike the otherwise equivalent
239      * {@code newScheduledThreadPool(1)} the returned executor is
240      * guaranteed not to be reconfigurable to use additional threads.
241      *
242      * @return the newly created scheduled executor
243      */
244     static ScheduledExecutorService newSingleThreadScheduledExecutor() {
245         return new DelegatedScheduledExecutorService!ScheduledThreadPoolExecutor
246             (new ScheduledThreadPoolExecutor(1));
247     }
248 
249     /**
250      * Creates a single-threaded executor that can schedule commands
251      * to run after a given delay, or to execute periodically.  (Note
252      * however that if this single thread terminates due to a failure
253      * during execution prior to shutdown, a new one will take its
254      * place if needed to execute subsequent tasks.)  Tasks are
255      * guaranteed to execute sequentially, and no more than one task
256      * will be active at any given time. Unlike the otherwise
257      * equivalent {@code newScheduledThreadPool(1, threadFactory)}
258      * the returned executor is guaranteed not to be reconfigurable to
259      * use additional threads.
260      *
261      * @param threadFactory the factory to use when creating new threads
262      * @return the newly created scheduled executor
263      * @throws NullPointerException if threadFactory is null
264      */
265     static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
266         return new DelegatedScheduledExecutorService!ScheduledThreadPoolExecutor
267             (new ScheduledThreadPoolExecutor(1, threadFactory));
268     }
269 
270     /**
271      * Creates a thread pool that can schedule commands to run after a
272      * given delay, or to execute periodically.
273      * @param corePoolSize the number of threads to keep in the pool,
274      * even if they are idle
275      * @return the newly created scheduled thread pool
276      * @throws IllegalArgumentException if {@code corePoolSize < 0}
277      */
278     static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
279         return new ScheduledThreadPoolExecutor(corePoolSize);
280     }
281 
282     /**
283      * Creates a thread pool that can schedule commands to run after a
284      * given delay, or to execute periodically.
285      * @param corePoolSize the number of threads to keep in the pool,
286      * even if they are idle
287      * @param threadFactory the factory to use when the executor
288      * creates a new thread
289      * @return the newly created scheduled thread pool
290      * @throws IllegalArgumentException if {@code corePoolSize < 0}
291      * @throws NullPointerException if threadFactory is null
292      */
293     static ScheduledExecutorService newScheduledThreadPool(
294             int corePoolSize, ThreadFactory threadFactory) {
295         return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
296     }
297 
298     // /**
299     //  * Returns an object that delegates all defined {@link
300     //  * ExecutorService} methods to the given executor, but not any
301     //  * other methods that might otherwise be accessible using
302     //  * casts. This provides a way to safely "freeze" configuration and
303     //  * disallow tuning of a given concrete implementation.
304     //  * @param executor the underlying implementation
305     //  * @return an {@code ExecutorService} instance
306     //  * @throws NullPointerException if executor null
307     //  */
308     // static ExecutorService unconfigurableExecutorService(ExecutorService executor) {
309     //     if (executor is null)
310     //         throw new NullPointerException();
311     //     return new DelegatedExecutorService(executor);
312     // }
313 
314     // /**
315     //  * Returns an object that delegates all defined {@link
316     //  * ScheduledExecutorService} methods to the given executor, but
317     //  * not any other methods that might otherwise be accessible using
318     //  * casts. This provides a way to safely "freeze" configuration and
319     //  * disallow tuning of a given concrete implementation.
320     //  * @param executor the underlying implementation
321     //  * @return a {@code ScheduledExecutorService} instance
322     //  * @throws NullPointerException if executor null
323     //  */
324     // static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) {
325     //     if (executor is null)
326     //         throw new NullPointerException();
327     //     return new DelegatedScheduledExecutorService(executor);
328     // }
329 
330     /**
331      * Returns a default thread factory used to create new threads.
332      * This factory creates all new threads used by an Executor in the
333      * same {@link ThreadGroupEx}. If there is a {@link
334      * java.lang.SecurityManager}, it uses the group of {@link
335      * System#getSecurityManager}, else the group of the thread
336      * invoking this {@code defaultThreadFactory} method. Each new
337      * thread is created as a non-daemon thread with priority set to
338      * the smaller of {@code Thread.PRIORITY_DEFAULT} and the maximum
339      * priority permitted in the thread group.  New threads have names
340      * accessible via {@link Thread#getName} of
341      * <em>pool-N-thread-M</em>, where <em>N</em> is the sequence
342      * number of this factory, and <em>M</em> is the sequence number
343      * of the thread created by this factory.
344      * @return a thread factory
345      */
346     static ThreadFactory defaultThreadFactory() {
347         return ThreadFactory.defaultThreadFactory();
348     }
349 
350     // /**
351     //  * Returns a thread factory used to create new threads that
352     //  * have the same permissions as the current thread.
353     //  * This factory creates threads with the same settings as {@link
354     //  * Executors#defaultThreadFactory}, additionally setting the
355     //  * AccessControlContext and contextClassLoader of new threads to
356     //  * be the same as the thread invoking this
357     //  * {@code privilegedThreadFactory} method.  A new
358     //  * {@code privilegedThreadFactory} can be created within an
359     //  * {@link AccessController#doPrivileged AccessController.doPrivileged}
360     //  * action setting the current thread's access control context to
361     //  * create threads with the selected permission settings holding
362     //  * within that action.
363     //  *
364     //  * <p>Note that while tasks running within such threads will have
365     //  * the same access control and class loader settings as the
366     //  * current thread, they need not have the same {@link
367     //  * java.lang.ThreadLocal} or {@link
368     //  * java.lang.InheritableThreadLocal} values. If necessary,
369     //  * particular values of thread locals can be set or reset before
370     //  * any task runs in {@link ThreadPoolExecutor} subclasses using
371     //  * {@link ThreadPoolExecutor#beforeExecute(Thread, Runnable)}.
372     //  * Also, if it is necessary to initialize worker threads to have
373     //  * the same InheritableThreadLocal settings as some other
374     //  * designated thread, you can create a custom ThreadFactory in
375     //  * which that thread waits for and services requests to create
376     //  * others that will inherit its values.
377     //  *
378     //  * @return a thread factory
379     //  * @throws AccessControlException if the current access control
380     //  * context does not have permission to both get and set context
381     //  * class loader
382     //  */
383     // static ThreadFactory privilegedThreadFactory() {
384     //     return new PrivilegedThreadFactory();
385     // }
386 
387     /**
388      * Returns a {@link Callable} object that, when
389      * called, runs the given task and returns the given result.  This
390      * can be useful when applying methods requiring a
391      * {@code Callable} to an otherwise resultless action.
392      * @param task the task to run
393      * @param result the result to return
394      * @param (T) the type of the result
395      * @return a callable object
396      * @throws NullPointerException if task null
397      */
398     static Callable!(void) callable(Runnable task) {
399         if (task is null)
400             throw new NullPointerException();
401         return new RunnableAdapter!(void)(task);
402     }
403 
404     static Callable!(T) callable(T)(Runnable task, T result) if(!is(T == void)) {
405         if (task is null)
406             throw new NullPointerException();
407         return new RunnableAdapter!(T)(task, result);
408     }
409 
410     /**
411      * Returns a {@link Callable} object that, when
412      * called, runs the given task and returns {@code null}.
413      * @param task the task to run
414      * @return a callable object
415      * @throws NullPointerException if task null
416      */
417     // static Callable!(Object) callable(Runnable task) {
418     //     if (task is null)
419     //         throw new NullPointerException();
420     //     return new RunnableAdapter!(Object)(task, null);
421     // }
422 
423     // /**
424     //  * Returns a {@link Callable} object that, when
425     //  * called, runs the given privileged action and returns its result.
426     //  * @param action the privileged action to run
427     //  * @return a callable object
428     //  * @throws NullPointerException if action null
429     //  */
430     // static Callable!(Object) callable(PrivilegedAction<?> action) {
431     //     if (action is null)
432     //         throw new NullPointerException();
433     //     return new Callable!(Object)() {
434     //         Object call() { return action.run(); }};
435     // }
436 
437     // /**
438     //  * Returns a {@link Callable} object that, when
439     //  * called, runs the given privileged exception action and returns
440     //  * its result.
441     //  * @param action the privileged exception action to run
442     //  * @return a callable object
443     //  * @throws NullPointerException if action null
444     //  */
445     // static Callable!(Object) callable(PrivilegedExceptionAction<?> action) {
446     //     if (action is null)
447     //         throw new NullPointerException();
448     //     return new Callable!(Object)() {
449     //         Object call() throws Exception { return action.run(); }};
450     // }
451 
452     // /**
453     //  * Returns a {@link Callable} object that will, when called,
454     //  * execute the given {@code callable} under the current access
455     //  * control context. This method should normally be invoked within
456     //  * an {@link AccessController#doPrivileged AccessController.doPrivileged}
457     //  * action to create callables that will, if possible, execute
458     //  * under the selected permission settings holding within that
459     //  * action; or if not possible, throw an associated {@link
460     //  * AccessControlException}.
461     //  * @param callable the underlying task
462     //  * @param (T) the type of the callable's result
463     //  * @return a callable object
464     //  * @throws NullPointerException if callable null
465     //  */
466     // static !(T) Callable!(T) privilegedCallable(Callable!(T) callable) {
467     //     if (callable is null)
468     //         throw new NullPointerException();
469     //     return new PrivilegedCallable!(T)(callable);
470     // }
471 
472     // /**
473     //  * Returns a {@link Callable} object that will, when called,
474     //  * execute the given {@code callable} under the current access
475     //  * control context, with the current context class loader as the
476     //  * context class loader. This method should normally be invoked
477     //  * within an
478     //  * {@link AccessController#doPrivileged AccessController.doPrivileged}
479     //  * action to create callables that will, if possible, execute
480     //  * under the selected permission settings holding within that
481     //  * action; or if not possible, throw an associated {@link
482     //  * AccessControlException}.
483     //  *
484     //  * @param callable the underlying task
485     //  * @param (T) the type of the callable's result
486     //  * @return a callable object
487     //  * @throws NullPointerException if callable null
488     //  * @throws AccessControlException if the current access control
489     //  * context does not have permission to both set and get context
490     //  * class loader
491     //  */
492     // static !(T) Callable!(T) privilegedCallableUsingCurrentClassLoader(Callable!(T) callable) {
493     //     if (callable is null)
494     //         throw new NullPointerException();
495     //     return new PrivilegedCallableUsingCurrentClassLoader!(T)(callable);
496     // }
497 
498 
499     // Methods for ExecutorService
500 
501     /**
502      * Submits a Runnable task for execution and returns a Future
503      * representing that task. The Future's {@code get} method will
504      * return {@code null} upon <em>successful</em> completion.
505      *
506      * @param task the task to submit
507      * @return a Future representing pending completion of the task
508      * @throws RejectedExecutionException if the task cannot be
509      *         scheduled for execution
510      * @throws NullPointerException if the task is null
511      */
512     static Future!(void) submit(ExecutorService es, Runnable task) {
513 
514         AbstractExecutorService aes = cast(AbstractExecutorService)es;
515         if(aes is null) 
516             throw new RejectedExecutionException("ExecutorService is null");
517         else
518             return aes.submit(task);
519 
520         // TypeInfo typeInfo = typeid(cast(Object)es);
521         // if(typeInfo == typeid(ThreadPoolExecutor)) {
522         //     AbstractExecutorService aes = cast(AbstractExecutorService)es;
523         //     return aes.submit(task);
524         // } else {
525         //     implementationMissing(false);
526         // }
527     }
528 
529     /**
530      * Submits a Runnable task for execution and returns a Future
531      * representing that task. The Future's {@code get} method will
532      * return the given result upon successful completion.
533      *
534      * @param task the task to submit
535      * @param result the result to return
536      * @param (T) the type of the result
537      * @return a Future representing pending completion of the task
538      * @throws RejectedExecutionException if the task cannot be
539      *         scheduled for execution
540      * @throws NullPointerException if the task is null
541      */
542     static Future!(T) submit(T)(ExecutorService es, Runnable task, T result) {
543         AbstractExecutorService aes = cast(AbstractExecutorService)es;
544         if(aes is null) 
545             throw new RejectedExecutionException("ExecutorService is null");
546         else
547             return aes.submit!T(task, result);
548                     
549         // TypeInfo typeInfo = typeid(cast(Object)es);
550         // if(typeInfo == typeid(ThreadPoolExecutor)) {
551         //     AbstractExecutorService aes = cast(AbstractExecutorService)es;
552         //     if(aes is null) 
553         //         throw new RejectedExecutionException("ExecutorService is null");
554         //     else
555         //         return aes.submit!T(task, result);
556         // } else {
557         //     implementationMissing(false);
558         // }
559     }
560 
561     /**
562      * Submits a value-returning task for execution and returns a
563      * Future representing the pending results of the task. The
564      * Future's {@code get} method will return the task's result upon
565      * successful completion.
566      *
567      * <p>
568      * If you would like to immediately block waiting
569      * for a task, you can use constructions of the form
570      * {@code result = exec.submit(aCallable).get();}
571      *
572      * <p>Note: The {@link Executors} class includes a set of methods
573      * that can convert some other common closure-like objects,
574      * for example, {@link java.security.PrivilegedAction} to
575      * {@link Callable} form so they can be submitted.
576      *
577      * @param task the task to submit
578      * @param (T) the type of the task's result
579      * @return a Future representing pending completion of the task
580      * @throws RejectedExecutionException if the task cannot be
581      *         scheduled for execution
582      * @throws NullPointerException if the task is null
583      */
584     static Future!(T) submit(T)(ExecutorService es, Callable!(T) task) {
585         AbstractExecutorService aes = cast(AbstractExecutorService)es;
586         if(aes is null) 
587             throw new RejectedExecutionException("ExecutorService is null");
588         else
589             return aes.submit!(T)(task);
590             
591         // TypeInfo typeInfo = typeid(cast(Object)es);
592         // if(typeInfo == typeid(ThreadPoolExecutor)) {
593         //     AbstractExecutorService aes = cast(AbstractExecutorService)es;
594         //     if(aes is null) 
595         //         throw new RejectedExecutionException("ExecutorService is null");
596         //     else
597         //         return aes.submit!(T)(task);
598         // } else {
599         //     implementationMissing(false);
600         // }
601     }
602 
603     /**
604      * Executes the given tasks, returning a list of Futures holding
605      * their status and results when all complete.
606      * {@link Future#isDone} is {@code true} for each
607      * element of the returned list.
608      * Note that a <em>completed</em> task could have
609      * terminated either normally or by throwing an exception.
610      * The results of this method are undefined if the given
611      * collection is modified while this operation is in progress.
612      *
613      * @param tasks the collection of tasks
614      * @param (T) the type of the values returned from the tasks
615      * @return a list of Futures representing the tasks, in the same
616      *         sequential order as produced by the iterator for the
617      *         given task list, each of which has completed
618      * @throws InterruptedException if interrupted while waiting, in
619      *         which case unfinished tasks are cancelled
620      * @throws NullPointerException if tasks or any of its elements are {@code null}
621      * @throws RejectedExecutionException if any task cannot be
622      *         scheduled for execution
623      */
624     static List!(Future!(T)) invokeAll(T)(ExecutorService es, Collection!(Callable!(T)) tasks) {
625 
626         AbstractExecutorService aes = cast(AbstractExecutorService)es;
627         if(aes is null) 
628             throw new RejectedExecutionException("ExecutorService is null");
629         else {
630             aes.invokeAll!(T)(tasks);
631         }
632 
633     }
634 
635     /**
636      * Executes the given tasks, returning a list of Futures holding
637      * their status and results
638      * when all complete or the timeout expires, whichever happens first.
639      * {@link Future#isDone} is {@code true} for each
640      * element of the returned list.
641      * Upon return, tasks that have not completed are cancelled.
642      * Note that a <em>completed</em> task could have
643      * terminated either normally or by throwing an exception.
644      * The results of this method are undefined if the given
645      * collection is modified while this operation is in progress.
646      *
647      * @param tasks the collection of tasks
648      * @param timeout the maximum time to wait
649      * @param unit the time unit of the timeout argument
650      * @param (T) the type of the values returned from the tasks
651      * @return a list of Futures representing the tasks, in the same
652      *         sequential order as produced by the iterator for the
653      *         given task list. If the operation did not time out,
654      *         each task will have completed. If it did time out, some
655      *         of these tasks will not have completed.
656      * @throws InterruptedException if interrupted while waiting, in
657      *         which case unfinished tasks are cancelled
658      * @throws NullPointerException if tasks, any of its elements, or
659      *         unit are {@code null}
660      * @throws RejectedExecutionException if any task cannot be scheduled
661      *         for execution
662      */
663     static List!(Future!(T)) invokeAll(T)(ExecutorService es, Collection!(Callable!(T)) tasks,
664                                   Duration timeout) {
665         AbstractExecutorService aes = cast(AbstractExecutorService)es;
666         if(aes is null) 
667             throw new RejectedExecutionException("ExecutorService is null");
668         else {
669             aes.invokeAll!(T)(tasks, timeout);
670         }
671     }
672 
673     /**
674      * Executes the given tasks, returning the result
675      * of one that has completed successfully (i.e., without throwing
676      * an exception), if any do. Upon normal or exceptional return,
677      * tasks that have not completed are cancelled.
678      * The results of this method are undefined if the given
679      * collection is modified while this operation is in progress.
680      *
681      * @param tasks the collection of tasks
682      * @param (T) the type of the values returned from the tasks
683      * @return the result returned by one of the tasks
684      * @throws InterruptedException if interrupted while waiting
685      * @throws NullPointerException if tasks or any element task
686      *         subject to execution is {@code null}
687      * @throws IllegalArgumentException if tasks is empty
688      * @throws ExecutionException if no task successfully completes
689      * @throws RejectedExecutionException if tasks cannot be scheduled
690      *         for execution
691      */
692     static T invokeAny(T)(ExecutorService es, Collection!(Callable!(T)) tasks) {
693         AbstractExecutorService aes = cast(AbstractExecutorService)es;
694         if(aes is null) 
695             throw new RejectedExecutionException("ExecutorService is null");
696         else {
697             aes.invokeAny!(T)(tasks);
698         }
699     }
700 
701     /**
702      * Executes the given tasks, returning the result
703      * of one that has completed successfully (i.e., without throwing
704      * an exception), if any do before the given timeout elapses.
705      * Upon normal or exceptional return, tasks that have not
706      * completed are cancelled.
707      * The results of this method are undefined if the given
708      * collection is modified while this operation is in progress.
709      *
710      * @param tasks the collection of tasks
711      * @param timeout the maximum time to wait
712      * @param unit the time unit of the timeout argument
713      * @param (T) the type of the values returned from the tasks
714      * @return the result returned by one of the tasks
715      * @throws InterruptedException if interrupted while waiting
716      * @throws NullPointerException if tasks, or unit, or any element
717      *         task subject to execution is {@code null}
718      * @throws TimeoutException if the given timeout elapses before
719      *         any task successfully completes
720      * @throws ExecutionException if no task successfully completes
721      * @throws RejectedExecutionException if tasks cannot be scheduled
722      *         for execution
723      */
724     static T invokeAny(T)(ExecutorService es, Collection!(Callable!(T)) tasks,
725                     Duration timeout)  {
726         AbstractExecutorService aes = cast(AbstractExecutorService)es;
727         if(aes is null) 
728             throw new RejectedExecutionException("ExecutorService is null");
729         else {
730             aes.invokeAny!(T)(tasks, timeout);
731         }
732     }
733 
734     /** Cannot instantiate. */
735     private this() {}
736 }
737 
738 // Non-classes supporting the methods
739 
740 /**
741  * A callable that runs given task and returns given result.
742  */
743 private final class RunnableAdapter(T) : Callable!(T) if(is(T == void)) {
744     private Runnable task;
745     this(Runnable task) {
746         this.task = task;
747     }
748 
749     T call() {
750         try {
751             task.run();
752         } catch(Throwable th) {
753             warning(th.msg);
754             version(HUNT_DEBUG) {
755                 warning(th);
756             }
757         }
758     }
759 
760     override string toString() {
761         return super.toString() ~ "[Wrapped task = " ~ (cast(Object)task).toString() ~ "]";
762     }
763 }
764 
765 private final class RunnableAdapter(T) : Callable!(T) if(!is(T == void)) {
766     private Runnable task;
767     private T result;
768 
769     this(Runnable task, T result) {
770         this.task = task;
771         this.result = result;
772     }
773 
774     T call() {
775         try {
776             task.run();
777         } catch(Throwable th) {
778             warning(th.msg);
779             version(HUNT_DEBUG) {
780                 warning(th);
781             }
782         }
783         return result;
784     }
785 
786     override string toString() {
787         return super.toString() ~ "[Wrapped task = " ~ (cast(Object)task).toString() ~ "]";
788     }
789 }
790 
791 // /**
792 //  * A callable that runs under established access control settings.
793 //  */
794 // private final class PrivilegedCallable!(T) : Callable!(T) {
795 //     Callable!(T) task;
796 //     AccessControlContext acc;
797 
798 //     PrivilegedCallable(Callable!(T) task) {
799 //         this.task = task;
800 //         this.acc = AccessController.getContext();
801 //     }
802 
803 //     T call() throws Exception {
804 //         try {
805 //             return AccessController.doPrivileged(
806 //                 new PrivilegedExceptionAction!(T)() {
807 //                     T run() throws Exception {
808 //                         return task.call();
809 //                     }
810 //                 }, acc);
811 //         } catch (PrivilegedActionException e) {
812 //             throw e.getException();
813 //         }
814 //     }
815 
816 //     string toString() {
817 //         return super.toString() ~ "[Wrapped task = " ~ task ~ "]";
818 //     }
819 // }
820 
821 // /**
822 //  * A callable that runs under established access control settings and
823 //  * current ClassLoader.
824 //  */
825 // private final class PrivilegedCallableUsingCurrentClassLoader(T)
826 //         : Callable!(T) {
827 //     Callable!(T) task;
828 //     AccessControlContext acc;
829 //     ClassLoader ccl;
830 
831 //     this(Callable!(T) task) {
832 //         SecurityManager sm = System.getSecurityManager();
833 //         if (sm !is null) {
834 //             // Calls to getContextClassLoader from this class
835 //             // never trigger a security check, but we check
836 //             // whether our callers have this permission anyways.
837 //             sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);
838 
839 //             // Whether setContextClassLoader turns out to be necessary
840 //             // or not, we fail fast if permission is not available.
841 //             sm.checkPermission(new RuntimePermission("setContextClassLoader"));
842 //         }
843 //         this.task = task;
844 //         this.acc = AccessController.getContext();
845 //         this.ccl = Thread.getThis().getContextClassLoader();
846 //     }
847 
848 //     T call() throws Exception {
849 //         try {
850 //             return AccessController.doPrivileged(
851 //                 new PrivilegedExceptionAction!(T)() {
852 //                     T run() throws Exception {
853 //                         Thread t = Thread.getThis();
854 //                         ClassLoader cl = t.getContextClassLoader();
855 //                         if (ccl == cl) {
856 //                             return task.call();
857 //                         } else {
858 //                             t.setContextClassLoader(ccl);
859 //                             try {
860 //                                 return task.call();
861 //                             } finally {
862 //                                 t.setContextClassLoader(cl);
863 //                             }
864 //                         }
865 //                     }
866 //                 }, acc);
867 //         } catch (PrivilegedActionException e) {
868 //             throw e.getException();
869 //         }
870 //     }
871 
872 //     string toString() {
873 //         return super.toString() ~ "[Wrapped task = " ~ task ~ "]";
874 //     }
875 // }
876 
877 void reachabilityFence(ExecutorService) {
878     // do nothing;
879     // TODO: Tasks pending completion -@zxp at 5/10/2019, 10:50:31 AM    
880     // remove this
881 }
882 
883 /**
884  * A wrapper class that exposes only the ExecutorService methods
885  * of an ExecutorService implementation.
886  */
887 private class DelegatedExecutorService(U) : ExecutorService 
888     if(is(U : ExecutorService)) {
889         
890     private U e;
891 
892     this(U executor) { e = executor; }
893 
894     void execute(Runnable command) {
895         try {
896             e.execute(command);
897         } finally { reachabilityFence(this); }
898     }
899 
900     void shutdown() { e.shutdown(); }
901 
902     List!(Runnable) shutdownNow() {
903         try {
904             return e.shutdownNow();
905         } finally { reachabilityFence(this); }
906     }
907 
908     bool isShutdown() {
909         try {
910             return e.isShutdown();
911         } finally { reachabilityFence(this); }
912     }
913 
914     bool isTerminated() {
915         try {
916             return e.isTerminated();
917         } finally { reachabilityFence(this); }
918     }
919 
920     bool awaitTermination(Duration timeout) {
921         try {
922             return e.awaitTermination(timeout);
923         } finally { reachabilityFence(this); }
924     }
925 
926     Future!void submit(Runnable task) {
927         try {
928             return e.submit(task);
929         } finally { reachabilityFence(this); }
930     }
931 
932     Future!(T) submit(T)(Callable!(T) task) {
933         try {
934             return e.submit(task);
935         } finally { reachabilityFence(this); }
936     }
937 
938     Future!(T) submit(T)(Runnable task, T result) {
939         try {
940             return e.submit(task, result);
941         } finally { reachabilityFence(this); }
942     }
943 
944     List!(Future!(T)) invokeAll(T)(Collection!(Callable!(T)) tasks) {
945         try {
946             return e.invokeAll(tasks);
947         } finally { reachabilityFence(this); }
948     }
949 
950     List!(Future!(T)) invokeAll(T)(Collection!(Callable!(T)) tasks,
951                                          Duration timeout) {
952         try {
953             return e.invokeAll(tasks, timeout, unit);
954         } finally { reachabilityFence(this); }
955     }
956 
957     T invokeAny(T)(Collection!(Callable!(T)) tasks) {
958         try {
959             return e.invokeAny(tasks);
960         } finally { reachabilityFence(this); }
961     }
962     
963     T invokeAny(T)(Collection!(Callable!(T)) tasks,
964                            Duration timeout) {
965         try {
966             return e.invokeAny(tasks, timeout, unit);
967         } finally { reachabilityFence(this); }
968     }
969 }
970 
971 private class FinalizableDelegatedExecutorService(T) : DelegatedExecutorService!T {
972     this(T executor) {
973         super(executor);
974     }
975 
976     protected void finalize() {
977         super.shutdown();
978     }
979 }
980 
981 /**
982  * A wrapper class that exposes only the ScheduledExecutorService
983  * methods of a ScheduledExecutorService implementation.
984  */
985 private class DelegatedScheduledExecutorService(T) : DelegatedExecutorService!T,
986         ScheduledExecutorService if(is(T : ScheduledExecutorService)){
987 
988     private T e;
989 
990     this(T executor) {
991         super(executor);
992         e = executor;
993     }
994 
995     ScheduledFuture!void schedule(Runnable command, Duration delay) {
996         return e.schedule(command, delay);
997     }
998 
999     ScheduledFuture!(V) schedule(V)(Callable!(V) callable, Duration delay) {
1000         return e.schedule!V(callable, delay);
1001     }
1002 
1003     ScheduledFuture!void scheduleAtFixedRate(Runnable command, Duration initialDelay, Duration period) {
1004         return e.scheduleAtFixedRate(command, initialDelay, period);
1005     }
1006 
1007     ScheduledFuture!void scheduleWithFixedDelay(Runnable command, Duration initialDelay, Duration delay) {
1008         return e.scheduleWithFixedDelay(command, initialDelay, delay);
1009     }
1010 }