1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 module hunt.concurrency.ForkJoinPool;
37 
38 import hunt.concurrency.AbstractExecutorService;
39 import hunt.concurrency.atomic.AtomicHelper;
40 import hunt.concurrency.Exceptions;
41 import hunt.concurrency.ForkJoinTask;
42 import hunt.concurrency.ForkJoinTaskHelper;
43 import hunt.concurrency.ThreadLocalRandom;
44 import hunt.concurrency.thread;
45 
46 import hunt.collection.List;
47 import hunt.collection.Collection;
48 import hunt.collection.Collections;
49 import hunt.logging.ConsoleLogger;
50 import hunt.Exceptions;
51 import hunt.Functions;
52 import hunt.system.Environment;
53 import hunt.system.Memory;
54 import hunt.util.Common;
55 import hunt.util.Configuration;
56 import hunt.util.DateTime;
57 import hunt.util.Runnable;
58 
59 import core.atomic;
60 import core.sync.mutex;
61 import core.thread;
62 import core.time;
63 
64 import std.algorithm;
65 import std.array;
66 import std.conv;
67 import std.datetime;
68 
69 alias ReentrantLock = Mutex;
70 
71 // import java.lang.Thread.UncaughtExceptionHandler;
72 // import java.lang.invoke.MethodHandles;
73 // import java.lang.invoke.VarHandle;
74 // import java.security.AccessController;
75 // import java.security.AccessControlContext;
76 // import java.security.Permission;
77 // import java.security.Permissions;
78 // import java.security.PrivilegedAction;
79 // import java.security.ProtectionDomain;
80 // import hunt.collection.ArrayList;
81 // import hunt.collection.Collection;
82 // import java.util.Collections;
83 // import java.util.List;
84 // import hunt.util.functional.Predicate;
85 // import hunt.concurrency.locks.LockSupport;
86 
87 
88 private {
89 
90 // Constants shared across ForkJoinPool and WorkQueue
91 
92 // Bounds
93 enum int SWIDTH       = 16;            // width of short
94 enum int SMASK        = 0xffff;        // short bits == max index
95 enum int MAX_CAP      = 0x7fff;        // max #workers - 1
96 enum int SQMASK       = 0x007e;        // max 64 (even) slots
97 
98 // Masks and units for WorkQueue.phase and ctl sp subfield
99 enum int UNSIGNALLED  = 1 << 31;       // must be negative
100 enum int SS_SEQ       = 1 << 16;       // version count
101 enum int QLOCK        = 1;             // must be 1
102 
103 // Mode bits and sentinels, some also used in WorkQueue id and.source fields
104 enum int OWNED        = 1;             // queue has owner thread
105 enum int FIFO         = 1 << 16;       // fifo queue or access mode
106 enum int SHUTDOWN     = 1 << 18;
107 enum int TERMINATED   = 1 << 19;
108 enum int STOP         = 1 << 31;       // must be negative
109 enum int QUIET        = 1 << 30;       // not scanning or working
110 enum int DORMANT      = QUIET | UNSIGNALLED;
111 
112 /**
113  * The maximum number of top-level polls per worker before
114  * checking other queues, expressed as a bit shift to, in effect,
115  * multiply by pool size, and then use as random value mask, so
116  * average bound is about poolSize*(1<<TOP_BOUND_SHIFT).  See
117  * above for rationale.
118  */
119 enum int TOP_BOUND_SHIFT = 10;
120 
121 /**
122  * Initial capacity of work-stealing queue array.
123  * Must be a power of two, at least 2.
124  */
125 enum int INITIAL_QUEUE_CAPACITY = 1 << 13;
126 
127 /**
128  * Maximum capacity for queue arrays. Must be a power of two less
129  * than or equal to 1 << (31 - width of array entry) to ensure
130  * lack of wraparound of index calculations, but defined to a
131  * value a bit less than this to help users trap runaway programs
132  * before saturating systems.
133  */
134 enum int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
135 
136 }
137 
138 /**
139  * An {@link ExecutorService} for running {@link ForkJoinTask}s.
140  * A {@code ForkJoinPool} provides the entry point for submissions
141  * from non-{@code ForkJoinTask} clients, as well as management and
142  * monitoring operations.
143  *
144  * <p>A {@code ForkJoinPool} differs from other kinds of {@link
145  * ExecutorService} mainly by virtue of employing
146  * <em>work-stealing</em>: all threads in the pool attempt to find and
147  * execute tasks submitted to the pool and/or created by other active
148  * tasks (eventually blocking waiting for work if none exist). This
149  * enables efficient processing when most tasks spawn other subtasks
150  * (as do most {@code ForkJoinTask}s), as well as when many small
151  * tasks are submitted to the pool from external clients.  Especially
152  * when setting <em>asyncMode</em> to true in constructors, {@code
153  * ForkJoinPool}s may also be appropriate for use with event-style
154  * tasks that are never joined. All worker threads are initialized
155  * with {@link Thread#isDaemon} set {@code true}.
156  *
157  * <p>A static {@link #commonPool()} is available and appropriate for
158  * most applications. The common pool is used by any ForkJoinTask that
159  * is not explicitly submitted to a specified pool. Using the common
160  * pool normally reduces resource usage (its threads are slowly
161  * reclaimed during periods of non-use, and reinstated upon subsequent
162  * use).
163  *
164  * <p>For applications that require separate or custom pools, a {@code
165  * ForkJoinPool} may be constructed with a given target parallelism
166  * level; by default, equal to the number of available processors.
167  * The pool attempts to maintain enough active (or available) threads
168  * by dynamically adding, suspending, or resuming internal worker
169  * threads, even if some tasks are stalled waiting to join others.
170  * However, no such adjustments are guaranteed in the face of blocked
171  * I/O or other unmanaged synchronization. The nested {@link
172  * ManagedBlocker} interface enables extension of the kinds of
173  * synchronization accommodated. The default policies may be
174  * overridden using a constructor with parameters corresponding to
175  * those documented in class {@link ThreadPoolExecutor}.
176  *
177  * <p>In addition to execution and lifecycle control methods, this
178  * class provides status check methods (for example
179  * {@link #getStealCount}) that are intended to aid in developing,
180  * tuning, and monitoring fork/join applications. Also, method
181  * {@link #toString} returns indications of pool state in a
182  * convenient form for informal monitoring.
183  *
184  * <p>As is the case with other ExecutorServices, there are three
185  * main task execution methods summarized in the following table.
186  * These are designed to be used primarily by clients not already
187  * engaged in fork/join computations in the current pool.  The main
188  * forms of these methods accept instances of {@code ForkJoinTask},
189  * but overloaded forms also allow mixed execution of plain {@code
190  * Runnable}- or {@code Callable}- based activities as well.  However,
191  * tasks that are already executing in a pool should normally instead
192  * use the within-computation forms listed in the table unless using
193  * async event-style tasks that are not usually joined, in which case
194  * there is little difference among choice of methods.
195  *
196  * <table class="plain">
197  * <caption>Summary of task execution methods</caption>
198  *  <tr>
199  *    <td></td>
200  *    <th scope="col"> Call from non-fork/join clients</th>
201  *    <th scope="col"> Call from within fork/join computations</th>
202  *  </tr>
203  *  <tr>
204  *    <th scope="row" style="text-align:left"> Arrange async execution</th>
205  *    <td> {@link #execute(ForkJoinTask)}</td>
206  *    <td> {@link ForkJoinTask#fork}</td>
207  *  </tr>
208  *  <tr>
209  *    <th scope="row" style="text-align:left"> Await and obtain result</th>
210  *    <td> {@link #invoke(ForkJoinTask)}</td>
211  *    <td> {@link ForkJoinTask#invoke}</td>
212  *  </tr>
213  *  <tr>
214  *    <th scope="row" style="text-align:left"> Arrange exec and obtain Future</th>
215  *    <td> {@link #submit(ForkJoinTask)}</td>
216  *    <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
217  *  </tr>
218  * </table>
219  *
220  * <p>The parameters used to construct the common pool may be controlled by
221  * setting the following {@linkplain System#getProperty system properties}:
222  * <ul>
223  * <li>{@code hunt.concurrency.ForkJoinPool.common.parallelism}
224  * - the parallelism level, a non-negative integer
225  * <li>{@code hunt.concurrency.ForkJoinPool.common.threadFactory}
226  * - the class name of a {@link ForkJoinWorkerThreadFactory}.
227  * The {@linkplain ClassLoader#getSystemClassLoader() system class loader}
228  * is used to load this class.
229  * <li>{@code hunt.concurrency.ForkJoinPool.common.exceptionHandler}
230  * - the class name of a {@link UncaughtExceptionHandler}.
231  * The {@linkplain ClassLoader#getSystemClassLoader() system class loader}
232  * is used to load this class.
233  * <li>{@code hunt.concurrency.ForkJoinPool.common.maximumSpares}
234  * - the maximum number of allowed extra threads to maintain target
235  * parallelism (default 256).
236  * </ul>
237  * If no thread factory is supplied via a system property, then the
238  * common pool uses a factory that uses the system class loader as the
239  * {@linkplain Thread#getContextClassLoader() thread context class loader}.
240  * In addition, if a {@link SecurityManager} is present, then
241  * the common pool uses a factory supplying threads that have no
242  * {@link Permissions} enabled.
243  *
244  * Upon any error in establishing these settings, default parameters
245  * are used. It is possible to disable or limit the use of threads in
246  * the common pool by setting the parallelism property to zero, and/or
247  * using a factory that may return {@code null}. However doing so may
248  * cause unjoined tasks to never be executed.
249  *
250  * <p><b>Implementation notes</b>: This implementation restricts the
251  * maximum number of running threads to 32767. Attempts to create
252  * pools with greater than the maximum number result in
253  * {@code IllegalArgumentException}.
254  *
255  * <p>This implementation rejects submitted tasks (that is, by throwing
256  * {@link RejectedExecutionException}) only when the pool is shut down
257  * or internal resources have been exhausted.
258  *
259  * @author Doug Lea
260  */
261 class ForkJoinPool : AbstractExecutorService { 
262 
263     /*
264      * Implementation Overview
265      *
266      * This class and its nested classes provide the main
267      * functionality and control for a set of worker threads:
268      * Submissions from non-FJ threads enter into submission queues.
269      * Workers take these tasks and typically split them into subtasks
270      * that may be stolen by other workers. Work-stealing based on
271      * randomized scans generally leads to better throughput than
272      * "work dealing" in which producers assign tasks to idle threads,
273      * in part because threads that have finished other tasks before
274      * the signalled thread wakes up (which can be a long time) can
275      * take the task instead.  Preference rules give first priority to
276      * processing tasks from their own queues (LIFO or FIFO, depending
277      * on mode), then to randomized FIFO steals of tasks in other
278      * queues.  This framework began as vehicle for supporting
279      * tree-structured parallelism using work-stealing.  Over time,
280      * its scalability advantages led to extensions and changes to
281      * better support more diverse usage contexts.  Because most
282      * internal methods and nested classes are interrelated, their
283      * main rationale and descriptions are presented here; individual
284      * methods and nested classes contain only brief comments about
285      * details.
286      *
287      * WorkQueues
288      * ==========
289      *
290      * Most operations occur within work-stealing queues (in nested
291      * class WorkQueue).  These are special forms of Deques that
292      * support only three of the four possible end-operations -- push,
293      * pop, and poll (aka steal), under the further constraints that
294      * push and pop are called only from the owning thread (or, as
295      * extended here, under a lock), while poll may be called from
296      * other threads.  (If you are unfamiliar with them, you probably
297      * want to read Herlihy and Shavit's book "The Art of
298      * Multiprocessor programming", chapter 16 describing these in
299      * more detail before proceeding.)  The main work-stealing queue
300      * design is roughly similar to those in the papers "Dynamic
301      * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
302      * (http://research.sun.com/scalable/pubs/index.html) and
303      * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
304      * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
305      * The main differences ultimately stem from GC requirements that
306      * we null out taken slots as soon as we can, to maintain as small
307      * a footprint as possible even in programs generating huge
308      * numbers of tasks. To accomplish this, we shift the CAS
309      * arbitrating pop vs poll (steal) from being on the indices
310      * ("base" and "top") to the slots themselves.
311      *
312      * Adding tasks then takes the form of a classic array push(task)
313      * in a circular buffer:
314      *    q.array[q.top++ % length] = task;
315      *
316      * (The actual code needs to null-check and size-check the array,
317      * uses masking, not mod, for indexing a power-of-two-sized array,
318      * adds a release fence for publication, and possibly signals
319      * waiting workers to start scanning -- see below.)  Both a
320      * successful pop and poll mainly entail a CAS of a slot from
321      * non-null to null.
322      *
323      * The pop operation (always performed by owner) is:
324      *   if ((the task at top slot is not null) and
325      *        (CAS slot to null))
326      *           decrement top and return task;
327      *
328      * And the poll operation (usually by a stealer) is
329      *    if ((the task at base slot is not null) and
330      *        (CAS slot to null))
331      *           increment base and return task;
332      *
333      * There are several variants of each of these. Most uses occur
334      * within operations that also interleave contention or emptiness
335      * tracking or inspection of elements before extracting them, so
336      * must interleave these with the above code. When performed by
337      * owner, getAndSet is used instead of CAS (see for example method
338      * nextLocalTask) which is usually more efficient, and possible
339      * because the top index cannot independently change during the
340      * operation.
341      *
342      * Memory ordering.  See "Correct and Efficient Work-Stealing for
343      * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
344      * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
345      * analysis of memory ordering requirements in work-stealing
346      * algorithms similar to (but different than) the one used here.
347      * Extracting tasks in array slots via (fully fenced) CAS provides
348      * primary synchronization. The base and top indices imprecisely
349      * guide where to extract from. We do not usually require strict
350      * orderings of array and index updates. Many index accesses use
351      * plain mode, with ordering constrained by surrounding context
352      * (usually with respect to element CASes or the two WorkQueue
353      * fields source and phase). When not otherwise already
354      * constrained, reads of "base" by queue owners use acquire-mode,
355      * and some externally callable methods preface accesses with
356      * acquire fences.  Additionally, to ensure that index update
357      * writes are not coalesced or postponed in loops etc, "opaque"
358      * mode is used in a few cases where timely writes are not
359      * otherwise ensured. The "locked" versions of push- and pop-
360      * based methods for shared queues differ from owned versions
361      * because locking already forces some of the ordering.
362      *
363      * Because indices and slot contents cannot always be consistent,
364      * a check that base == top indicates (momentary) emptiness, but
365      * otherwise may err on the side of possibly making the queue
366      * appear nonempty when a push, pop, or poll have not fully
367      * committed, or making it appear empty when an update of top has
368      * not yet been visibly written.  (Method isEmpty() checks the
369      * case of a partially completed removal of the last element.)
370      * Because of this, the poll operation, considered individually,
371      * is not wait-free. One thief cannot successfully continue until
372      * another in-progress one (or, if previously empty, a push)
373      * visibly completes.  This can stall threads when required to
374      * consume from a given queue (see method poll()).  However, in
375      * the aggregate, we ensure at least probabilistic
376      * non-blockingness.  If an attempted steal fails, a scanning
377      * thief chooses a different random victim target to try next. So,
378      * in order for one thief to progress, it suffices for any
379      * in-progress poll or new push on any empty queue to complete.
380      *
381      * This approach also enables support of a user mode in which
382      * local task processing is in FIFO, not LIFO order, simply by
383      * using poll rather than pop.  This can be useful in
384      * message-passing frameworks in which tasks are never joined.
385      *
386      * WorkQueues are also used in a similar way for tasks submitted
387      * to the pool. We cannot mix these tasks in the same queues used
388      * by workers. Instead, we randomly associate submission queues
389      * with submitting threads, using a form of hashing.  The
390      * ThreadLocalRandom probe value serves as a hash code for
391      * choosing existing queues, and may be randomly repositioned upon
392      * contention with other submitters.  In essence, submitters act
393      * like workers except that they are restricted to executing local
394      * tasks that they submitted.  Insertion of tasks in shared mode
395      * requires a lock but we use only a simple spinlock (using field
396      * phase), because submitters encountering a busy queue move to a
397      * different position to use or create other queues -- they block
398      * only when creating and registering new queues. Because it is
399      * used only as a spinlock, unlocking requires only a "releasing"
400      * store (using setRelease) unless otherwise signalling.
401      *
402      * Management
403      * ==========
404      *
405      * The main throughput advantages of work-stealing stem from
406      * decentralized control -- workers mostly take tasks from
407      * themselves or each other, at rates that can exceed a billion
408      * per second.  The pool itself creates, activates (enables
409      * scanning for and running tasks), deactivates, blocks, and
410      * terminates threads, all with minimal central information.
411      * There are only a few properties that we can globally track or
412      * maintain, so we pack them into a small number of variables,
413      * often maintaining atomicity without blocking or locking.
414      * Nearly all essentially atomic control state is held in a few
415      * variables that are by far most often read (not
416      * written) as status and consistency checks. We pack as much
417      * information into them as we can.
418      *
419      * Field "ctl" contains 64 bits holding information needed to
420      * atomically decide to add, enqueue (on an event queue), and
421      * dequeue and release workers.  To enable this packing, we
422      * restrict maximum parallelism to (1<<15)-1 (which is far in
423      * excess of normal operating range) to allow ids, counts, and
424      * their negations (used for thresholding) to fit into 16bit
425      * subfields.
426      *
427      * Field "mode" holds configuration parameters as well as lifetime
428      * status, atomically and monotonically setting SHUTDOWN, STOP,
429      * and finally TERMINATED bits.
430      *
431      * Field "workQueues" holds references to WorkQueues.  It is
432      * updated (only during worker creation and termination) under
433      * lock (using field workerNamePrefix as lock), but is otherwise
434      * concurrently readable, and accessed directly. We also ensure
435      * that uses of the array reference itself never become too stale
436      * in case of resizing, by arranging that (re-)reads are separated
437      * by at least one acquiring read access.  To simplify index-based
438      * operations, the array size is always a power of two, and all
439      * readers must tolerate null slots. Worker queues are at odd
440      * indices. Shared (submission) queues are at even indices, up to
441      * a maximum of 64 slots, to limit growth even if the array needs
442      * to expand to add more workers. Grouping them together in this
443      * way simplifies and speeds up task scanning.
444      *
445      * All worker thread creation is on-demand, triggered by task
446      * submissions, replacement of terminated workers, and/or
447      * compensation for blocked workers. However, all other support
448      * code is set up to work with other policies.  To ensure that we
449      * do not hold on to worker references that would prevent GC, all
450      * accesses to workQueues are via indices into the workQueues
451      * array (which is one source of some of the messy code
452      * constructions here). In essence, the workQueues array serves as
453      * a weak reference mechanism. Thus for example the stack top
454      * subfield of ctl stores indices, not references.
455      *
456      * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
457      * cannot let workers spin indefinitely scanning for tasks when
458      * none can be found immediately, and we cannot start/resume
459      * workers unless there appear to be tasks available.  On the
460      * other hand, we must quickly prod them into action when new
461      * tasks are submitted or generated. In many usages, ramp-up time
462      * is the main limiting factor in overall performance, which is
463      * compounded at program start-up by JIT compilation and
464      * allocation. So we streamline this as much as possible.
465      *
466      * The "ctl" field atomically maintains total worker and
467      * "released" worker counts, plus the head of the available worker
468      * queue (actually stack, represented by the lower 32bit subfield
469      * of ctl).  Released workers are those known to be scanning for
470      * and/or running tasks. Unreleased ("available") workers are
471      * recorded in the ctl stack. These workers are made available for
472      * signalling by enqueuing in ctl (see method runWorker).  The
473      * "queue" is a form of Treiber stack. This is ideal for
474      * activating threads in most-recently used order, and improves
475      * performance and locality, outweighing the disadvantages of
476      * being prone to contention and inability to release a worker
477      * unless it is topmost on stack.  To avoid missed signal problems
478      * inherent in any wait/signal design, available workers rescan
479      * for (and if found run) tasks after enqueuing.  Normally their
480      * release status will be updated while doing so, but the released
481      * worker ctl count may underestimate the number of active
482      * threads. (However, it is still possible to determine quiescence
483      * via a validation traversal -- see isQuiescent).  After an
484      * unsuccessful rescan, available workers are blocked until
485      * signalled (see signalWork).  The top stack state holds the
486      * value of the "phase" field of the worker: its index and status,
487      * plus a version counter that, in addition to the count subfields
488      * (also serving as version stamps) provide protection against
489      * Treiber stack ABA effects.
490      *
491      * Creating workers. To create a worker, we pre-increment counts
492      * (serving as a reservation), and attempt to construct a
493      * ForkJoinWorkerThread via its factory. Upon construction, the
494      * new thread invokes registerWorker, where it constructs a
495      * WorkQueue and is assigned an index in the workQueues array
496      * (expanding the array if necessary). The thread is then started.
497      * Upon any exception across these steps, or null return from
498      * factory, deregisterWorker adjusts counts and records
499      * accordingly.  If a null return, the pool continues running with
500      * fewer than the target number workers. If exceptional, the
501      * exception is propagated, generally to some external caller.
502      * Worker index assignment avoids the bias in scanning that would
503      * occur if entries were sequentially packed starting at the front
504      * of the workQueues array. We treat the array as a simple
505      * power-of-two hash table, expanding as needed. The seedIndex
506      * increment ensures no collisions until a resize is needed or a
507      * worker is deregistered and replaced, and thereafter keeps
508      * probability of collision low. We cannot use
509      * ThreadLocalRandom.getProbe() for similar purposes here because
510      * the thread has not started yet, but do so for creating
511      * submission queues for existing external threads (see
512      * externalPush).
513      *
514      * WorkQueue field "phase" is used by both workers and the pool to
515      * manage and track whether a worker is UNSIGNALLED (possibly
516      * blocked waiting for a signal).  When a worker is enqueued its
517      * phase field is set. Note that phase field updates lag queue CAS
518      * releases so usage requires care -- seeing a negative phase does
519      * not guarantee that the worker is available. When queued, the
520      * lower 16 bits of scanState must hold its pool index. So we
521      * place the index there upon initialization and otherwise keep it
522      * there or restore it when necessary.
523      *
524      * The ctl field also serves as the basis for memory
525      * synchronization surrounding activation. This uses a more
526      * efficient version of a Dekker-like rule that task producers and
527      * consumers sync with each other by both writing/CASing ctl (even
528      * if to its current value).  This would be extremely costly. So
529      * we relax it in several ways: (1) Producers only signal when
530      * their queue is possibly empty at some point during a push
531      * operation (which requires conservatively checking size zero or
532      * one to cover races). (2) Other workers propagate this signal
533      * when they find tasks in a queue with size greater than one. (3)
534      * Workers only enqueue after scanning (see below) and not finding
535      * any tasks.  (4) Rather than CASing ctl to its current value in
536      * the common case where no action is required, we reduce write
537      * contention by equivalently prefacing signalWork when called by
538      * an external task producer using a memory access with
539      * full-semantics or a "fullFence".
540      *
541      * Almost always, too many signals are issued, in part because a
542      * task producer cannot tell if some existing worker is in the
543      * midst of finishing one task (or already scanning) and ready to
544      * take another without being signalled. So the producer might
545      * instead activate a different worker that does not find any
546      * work, and then inactivates. This scarcely matters in
547      * steady-state computations involving all workers, but can create
548      * contention and bookkeeping bottlenecks during ramp-up,
549      * ramp-down, and small computations involving only a few workers.
550      *
551      * Scanning. Method scan (from runWorker) performs top-level
552      * scanning for tasks. (Similar scans appear in helpQuiesce and
553      * pollScan.)  Each scan traverses and tries to poll from each
554      * queue starting at a random index. Scans are not performed in
555      * ideal random permutation order, to reduce cacheline
556      * contention. The pseudorandom generator need not have
557      * high-quality statistical properties in the long term, but just
558      * within computations; We use Marsaglia XorShifts (often via
559      * ThreadLocalRandom.nextSecondarySeed), which are cheap and
560      * suffice. Scanning also includes contention reduction: When
561      * scanning workers fail to extract an apparently existing task,
562      * they soon restart at a different pseudorandom index.  This form
563      * of backoff improves throughput when many threads are trying to
564      * take tasks from few queues, which can be common in some usages.
565      * Scans do not otherwise explicitly take into account core
566      * affinities, loads, cache localities, etc, However, they do
567      * exploit temporal locality (which usually approximates these) by
568      * preferring to re-poll from the same queue after a successful
569      * poll before trying others (see method topLevelExec). However
570      * this preference is bounded (see TOP_BOUND_SHIFT) as a safeguard
571      * against infinitely unfair looping under unbounded user task
572      * recursion, and also to reduce long-term contention when many
573      * threads poll few queues holding many small tasks. The bound is
574      * high enough to avoid much impact on locality and scheduling
575      * overhead.
576      *
577      * Trimming workers. To release resources after periods of lack of
578      * use, a worker starting to wait when the pool is quiescent will
579      * time out and terminate (see method runWorker) if the pool has
580      * remained quiescent for period given by field keepAlive.
581      *
582      * Shutdown and Termination. A call to shutdownNow invokes
583      * tryTerminate to atomically set a runState bit. The calling
584      * thread, as well as every other worker thereafter terminating,
585      * helps terminate others by cancelling their unprocessed tasks,
586      * and waking them up, doing so repeatedly until stable. Calls to
587      * non-abrupt shutdown() preface this by checking whether
588      * termination should commence by sweeping through queues (until
589      * stable) to ensure lack of in-flight submissions and workers
590      * about to process them before triggering the "STOP" phase of
591      * termination.
592      *
593      * Joining Tasks
594      * =============
595      *
596      * Any of several actions may be taken when one worker is waiting
597      * to join a task stolen (or always held) by another.  Because we
598      * are multiplexing many tasks on to a pool of workers, we can't
599      * always just let them block (as in Thread.join).  We also cannot
600      * just reassign the joiner's run-time stack with another and
601      * replace it later, which would be a form of "continuation", that
602      * even if possible is not necessarily a good idea since we may
603      * need both an unblocked task and its continuation to progress.
604      * Instead we combine two tactics:
605      *
606      *   Helping: Arranging for the joiner to execute some task that it
607      *      would be running if the steal had not occurred.
608      *
609      *   Compensating: Unless there are already enough live threads,
610      *      method tryCompensate() may create or re-activate a spare
611      *      thread to compensate for blocked joiners until they unblock.
612      *
613      * A third form (implemented in tryRemoveAndExec) amounts to
614      * helping a hypothetical compensator: If we can readily tell that
615      * a possible action of a compensator is to steal and execute the
616      * task being joined, the joining thread can do so directly,
617      * without the need for a compensation thread.
618      *
619      * The ManagedBlocker extension API can't use helping so relies
620      * only on compensation in method awaitBlocker.
621      *
622      * The algorithm in awaitJoin entails a form of "linear helping".
623      * Each worker records (in field source) the id of the queue from
624      * which it last stole a task.  The scan in method awaitJoin uses
625      * these markers to try to find a worker to help (i.e., steal back
626      * a task from and execute it) that could hasten completion of the
627      * actively joined task.  Thus, the joiner executes a task that
628      * would be on its own local deque if the to-be-joined task had
629      * not been stolen. This is a conservative variant of the approach
630      * described in Wagner & Calder "Leapfrogging: a portable
631      * technique for implementing efficient futures" SIGPLAN Notices,
632      * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
633      * mainly in that we only record queue ids, not full dependency
634      * links.  This requires a linear scan of the workQueues array to
635      * locate stealers, but isolates cost to when it is needed, rather
636      * than adding to per-task overhead. Searches can fail to locate
637      * stealers GC stalls and the like delay recording sources.
638      * Further, even when accurately identified, stealers might not
639      * ever produce a task that the joiner can in turn help with. So,
640      * compensation is tried upon failure to find tasks to run.
641      *
642      * Compensation does not by default aim to keep exactly the target
643      * parallelism number of unblocked threads running at any given
644      * time. Some previous versions of this class employed immediate
645      * compensations for any blocked join. However, in practice, the
646      * vast majority of blockages are byproducts of GC and
647      * other JVM or OS activities that are made worse by replacement
648      * when they cause longer-term oversubscription.  Rather than
649      * impose arbitrary policies, we allow users to override the
650      * default of only adding threads upon apparent starvation.  The
651      * compensation mechanism may also be bounded.  Bounds for the
652      * commonPool (see COMMON_MAX_SPARES) better enable JVMs to cope
653      * with programming errors and abuse before running out of
654      * resources to do so.
655      *
656      * Common Pool
657      * ===========
658      *
659      * The static common pool always exists after static
660      * initialization.  Since it (or any other created pool) need
661      * never be used, we minimize initial construction overhead and
662      * footprint to the setup of about a dozen fields.
663      *
664      * When external threads submit to the common pool, they can
665      * perform subtask processing (see externalHelpComplete and
666      * related methods) upon joins.  This caller-helps policy makes it
667      * sensible to set common pool parallelism level to one (or more)
668      * less than the total number of available cores, or even zero for
669      * pure caller-runs.  We do not need to record whether external
670      * submissions are to the common pool -- if not, external help
671      * methods return quickly. These submitters would otherwise be
672      * blocked waiting for completion, so the extra effort (with
673      * liberally sprinkled task status checks) in inapplicable cases
674      * amounts to an odd form of limited spin-wait before blocking in
675      * ForkJoinTask.join.
676      *
677      * As a more appropriate default in managed environments, unless
678      * overridden by system properties, we use workers of subclass
679      * InnocuousForkJoinWorkerThread when there is a SecurityManager
680      * present. These workers have no permissions set, do not belong
681      * to any user-defined ThreadGroupEx, and erase all ThreadLocals
682      * after executing any top-level task (see
683      * WorkQueue.afterTopLevelExec).  The associated mechanics (mainly
684      * in ForkJoinWorkerThread) may be JVM-dependent and must access
685      * particular Thread class fields to achieve this effect.
686      *
687      * Memory placement
688      * ================
689      *
690      * Performance can be very sensitive to placement of instances of
691      * ForkJoinPool and WorkQueues and their queue arrays. To reduce
692      * false-sharing impact, the @Contended annotation isolates
693      * adjacent WorkQueue instances, as well as the ForkJoinPool.ctl
694      * field. WorkQueue arrays are allocated (by their threads) with
695      * larger initial sizes than most ever need, mostly to reduce
696      * false sharing with current garbage collectors that use cardmark
697      * tables.
698      *
699      * Style notes
700      * ===========
701      *
702      * Memory ordering relies mainly on VarHandles.  This can be
703      * awkward and ugly, but also reflects the need to control
704      * outcomes across the unusual cases that arise in very racy code
705      * with very few invariants. All fields are read into locals
706      * before use, and null-checked if they are references.  Array
707      * accesses using masked indices include checks (that are always
708      * true) that the array length is non-zero to avoid compilers
709      * inserting more expensive traps.  This is usually done in a
710      * "C"-like style of listing declarations at the heads of methods
711      * or blocks, and using inline assignments on first encounter.
712      * Nearly all explicit checks lead to bypass/return, not exception
713      * throws, because they may legitimately arise due to
714      * cancellation/revocation during shutdown.
715      *
716      * There is a lot of representation-level coupling among classes
717      * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask.  The
718      * fields of WorkQueue maintain data structures managed by
719      * ForkJoinPool, so are directly accessed.  There is little point
720      * trying to reduce this, since any associated future changes in
721      * representations will need to be accompanied by algorithmic
722      * changes anyway. Several methods intrinsically sprawl because
723      * they must accumulate sets of consistent reads of fields held in
724      * local variables. Some others are artificially broken up to
725      * reduce producer/consumer imbalances due to dynamic compilation.
726      * There are also other coding oddities (including several
727      * unnecessary-looking hoisted null checks) that help some methods
728      * perform reasonably even when interpreted (not compiled).
729      *
730      * The order of declarations in this file is (with a few exceptions):
731      * (1) Static utility functions
732      * (2) Nested (static) classes
733      * (3) Static fields
734      * (4) Fields, along with constants used when unpacking some of them
735      * (5) Internal control methods
736      * (6) Callbacks and other support for ForkJoinTask methods
737      * (7) Exported methods
738      * (8) Static block initializing statics in minimally dependent order
739      */
740 
741     // Static utilities
742 
743     /**
744      * If there is a security manager, makes sure caller has
745      * permission to modify threads.
746      */
747     // private static void checkPermission() {
748     //     SecurityManager security = System.getSecurityManager();
749     //     if (security !is null)
750     //         security.checkPermission(modifyThreadPermission);
751     // }
752 
753 
754     // static fields (initialized in static initializer below)
755 
756     /**
757      * Creates a new ForkJoinWorkerThread. This factory is used unless
758      * overridden in ForkJoinPool constructors.
759      */
760     __gshared ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;
761 
762     /**
763      * Permission required for callers of methods that may start or
764      * kill threads.
765      */
766     // __gshared RuntimePermission modifyThreadPermission;
767 
768     /**
769      * Common (static) pool. Non-null for use unless a static
770      * construction exception, but internal usages null-check on use
771      * to paranoically avoid potential initialization circularities
772      * as well as to simplify generated code.
773      */
774     __gshared ForkJoinPool common;
775 
776     /**
777      * Common pool parallelism. To allow simpler use and management
778      * when common pool threads are disabled, we allow the underlying
779      * common.parallelism field to be zero, but in that case still report
780      * parallelism as 1 to reflect resulting caller-runs mechanics.
781      */
782     __gshared int COMMON_PARALLELISM;
783 
784     /**
785      * Limit on spare thread construction in tryCompensate.
786      */
787     private __gshared int COMMON_MAX_SPARES;
788 
789     /**
790      * Sequence number for creating workerNamePrefix.
791      */
792     private shared static int poolNumberSequence;
793 
794     /**
795      * Returns the next sequence number. We don't expect this to
796      * ever contend, so use simple builtin sync.
797      */
798     private static int nextPoolId() {
799         return AtomicHelper.increment(poolNumberSequence);
800     }
801 
802     // static configuration constants
803 
804     /**
805      * Default idle timeout value (in milliseconds) for the thread
806      * triggering quiescence to park waiting for new work
807      */
808     private enum long DEFAULT_KEEPALIVE = 60_000L;
809 
810     /**
811      * Undershoot tolerance for idle timeouts
812      */
813     private enum long TIMEOUT_SLOP = 20L;
814 
815     /**
816      * The default value for COMMON_MAX_SPARES.  Overridable using the
817      * "hunt.concurrency.ForkJoinPool.common.maximumSpares" system
818      * property.  The default value is far in excess of normal
819      * requirements, but also far short of MAX_CAP and typical OS
820      * thread limits, so allows JVMs to catch misuse/abuse before
821      * running out of resources needed to do so.
822      */
823     private enum int DEFAULT_COMMON_MAX_SPARES = 256;
824 
825     /**
826      * Increment for seed generators. See class ThreadLocal for
827      * explanation.
828      */
829     private enum int SEED_INCREMENT = 0x9e3779b9;
830 
831     /*
832      * Bits and masks for field ctl, packed with 4 16 bit subfields:
833      * RC: Number of released (unqueued) workers minus target parallelism
834      * TC: Number of total workers minus target parallelism
835      * SS: version count and status of top waiting thread
836      * ID: poolIndex of top of Treiber stack of waiters
837      *
838      * When convenient, we can extract the lower 32 stack top bits
839      * (including version bits) as sp=(int)ctl.  The offsets of counts
840      * by the target parallelism and the positionings of fields makes
841      * it possible to perform the most common checks via sign tests of
842      * fields: When ac is negative, there are not enough unqueued
843      * workers, when tc is negative, there are not enough total
844      * workers.  When sp is non-zero, there are waiting workers.  To
845      * deal with possibly negative fields, we use casts in and out of
846      * "short" and/or signed shifts to maintain signedness.
847      *
848      * Because it occupies uppermost bits, we can add one release count
849      * using getAndAddLong of RC_UNIT, rather than CAS, when returning
850      * from a blocked join.  Other updates entail multiple subfields
851      * and masking, requiring CAS.
852      *
853      * The limits packed in field "bounds" are also offset by the
854      * parallelism level to make them comparable to the ctl rc and tc
855      * fields.
856      */
857 
858     // Lower and upper word masks
859     private enum long SP_MASK    = 0xffffffffL;
860     private enum long UC_MASK    = ~SP_MASK;
861 
862     // Release counts
863     private enum int  RC_SHIFT   = 48;
864     private enum long RC_UNIT    = 0x0001L << RC_SHIFT;
865     private enum long RC_MASK    = 0xffffL << RC_SHIFT;
866 
867     // Total counts
868     private enum int  TC_SHIFT   = 32;
869     private enum long TC_UNIT    = 0x0001L << TC_SHIFT;
870     private enum long TC_MASK    = 0xffffL << TC_SHIFT;
871     private enum long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
872 
873     // Instance fields
874 
875     long stealCount;            // collects worker nsteals
876     Duration keepAlive;                // milliseconds before dropping if idle
877     int indexSeed;                       // next worker index
878     int bounds;                    // min, max threads packed as shorts
879     int mode;                   // parallelism, runstate, queue mode
880     WorkQueue[] workQueues;              // main registry
881     string workerNamePrefix;       // for worker thread string; sync lock
882     Object workerNameLocker;
883     ForkJoinWorkerThreadFactory factory;
884     UncaughtExceptionHandler ueh;  // per-worker UEH
885     Predicate!(ForkJoinPool) saturate;
886 
887     // @jdk.internal.vm.annotation.Contended("fjpctl") // segregate
888     shared long ctl;                   // main pool control
889 
890     // Creating, registering and deregistering workers
891 
892     /**
893      * Tries to construct and start one worker. Assumes that total
894      * count has already been incremented as a reservation.  Invokes
895      * deregisterWorker on any failure.
896      *
897      * @return true if successful
898      */
899     private bool createWorker() {
900         ForkJoinWorkerThreadFactory fac = factory;
901         Throwable ex = null;
902         ForkJoinWorkerThread wt = null;
903         try {
904             if (fac !is null && (wt = fac.newThread(this)) !is null) {
905                 wt.start();
906                 return true;
907             }
908         } catch (Throwable rex) {
909             ex = rex;
910         }
911         deregisterWorker(wt, ex);
912         return false;
913     }
914 
915     /**
916      * Tries to add one worker, incrementing ctl counts before doing
917      * so, relying on createWorker to back out on failure.
918      *
919      * @param c incoming ctl value, with total count negative and no
920      * idle workers.  On CAS failure, c is refreshed and retried if
921      * this holds (otherwise, a new worker is not needed).
922      */
923     private void tryAddWorker(long c) {
924         do {
925             long nc = ((RC_MASK & (c + RC_UNIT)) |
926                        (TC_MASK & (c + TC_UNIT)));
927             if (ctl == c && AtomicHelper.compareAndSet(this.ctl, c, nc)) {
928                 // version(HUNT_CONCURRENCY_DEBUG) tracef("nc=%d, ctl=%d, c=%d", nc, ctl, c);
929                 createWorker();
930                 break;
931             }
932         } while (((c = ctl) & ADD_WORKER) != 0L && cast(int)c == 0);
933     }
934 
935     /**
936      * Callback from ForkJoinWorkerThread constructor to establish and
937      * record its WorkQueue.
938      *
939      * @param wt the worker thread
940      * @return the worker's queue
941      */
942     final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
943         UncaughtExceptionHandler handler;
944         wt.isDaemon(true);                             // configure thread
945         if ((handler = ueh) !is null)
946             wt.setUncaughtExceptionHandler(handler);
947         int tid = 0;                                    // for thread name
948         int idbits = mode & FIFO;
949         string prefix = workerNamePrefix;
950         WorkQueue w = new WorkQueue(this, wt);
951         if (prefix !is null) {
952             synchronized (this) {
953                 WorkQueue[] ws = workQueues; 
954                 int n;
955                 int s = indexSeed += SEED_INCREMENT;
956                 idbits |= (s & ~(SMASK | FIFO | DORMANT));
957                 if (ws !is null && (n = cast(int)ws.length) > 1) {
958                     int m = n - 1;
959                     tid = m & ((s << 1) | 1);           // odd-numbered indices
960                     for (int probes = n >>> 1;;) {      // find empty slot
961                         WorkQueue q;
962                         if ((q = ws[tid]) is null || q.phase == QUIET)
963                             break;
964                         else if (--probes == 0) {
965                             tid = n | 1;                // resize below
966                             break;
967                         }
968                         else
969                             tid = (tid + 2) & m;
970                     }
971                     w.phase = w.id = tid | idbits;      // now publishable
972 
973                     if (tid < n)
974                         ws[tid] = w;
975                     else {                              // expand array
976                         int an = n << 1;
977                         WorkQueue[] as = new WorkQueue[an];
978                         as[tid] = w;
979                         int am = an - 1;
980                         for (int j = 0; j < n; ++j) {
981                             WorkQueue v;                // copy external queue
982                             if ((v = ws[j]) !is null)    // position may change
983                                 as[v.id & am & SQMASK] = v;
984                             if (++j >= n)
985                                 break;
986                             as[j] = ws[j];              // copy worker
987                         }
988                         workQueues = as;
989                     }
990                 }
991             }
992             wt.name(prefix ~ tid.to!string());
993         }
994         return w;
995     }
996 
997     /**
998      * Final callback from terminating worker, as well as upon failure
999      * to construct or start a worker.  Removes record of worker from
1000      * array, and adjusts counts. If pool is shutting down, tries to
1001      * complete termination.
1002      *
1003      * @param wt the worker thread, or null if construction failed
1004      * @param ex the exception causing failure, or null if none
1005      */
1006     final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1007         WorkQueue w = null;
1008         int phase = 0;
1009         if (wt !is null && (w = wt.workQueue) !is null) {
1010             int wid = w.id;
1011             long ns = cast(long)w.nsteals & 0xffffffffL;
1012             if (!workerNamePrefix.empty()) {
1013                 synchronized (this) {
1014                     WorkQueue[] ws; size_t n, i;         // remove index from array
1015                     if ((ws = workQueues) !is null && (n = ws.length) > 0 &&
1016                         ws[i = wid & (n - 1)] == w)
1017                         ws[i] = null;
1018                     stealCount += ns;
1019                 }
1020             }
1021             phase = w.phase;
1022         }
1023         if (phase != QUIET) {                         // else pre-adjusted
1024             long c;                                   // decrement counts
1025             do {} while (!AtomicHelper.compareAndSet(this.ctl, c = ctl, 
1026                 ((RC_MASK & (c - RC_UNIT)) |
1027                 (TC_MASK & (c - TC_UNIT)) |
1028                 (SP_MASK & c))));
1029         }
1030         if (w !is null)
1031             w.cancelAll();                            // cancel remaining tasks
1032 
1033         if (!tryTerminate(false, false) &&            // possibly replace worker
1034             w !is null && w.array !is null)           // avoid repeated failures
1035             signalWork();
1036 
1037         if (ex !is null)                              // rethrow
1038             ForkJoinTaskHelper.rethrow(ex);
1039     }
1040 
1041     /**
1042      * Tries to create or release a worker if too few are running.
1043      */
1044     final void signalWork() {
1045         for (;;) {
1046             long c; int sp; WorkQueue[] ws; int i; WorkQueue v;
1047             if ((c = ctl) >= 0L)                      // enough workers
1048                 break;
1049             else if ((sp = cast(int)c) == 0) {            // no idle workers
1050                 if ((c & ADD_WORKER) != 0L)           // too few workers
1051                     tryAddWorker(c);
1052                 break;
1053             }
1054             else if ((ws = workQueues) is null)
1055                 break;                                // unstarted/terminated
1056             else if (ws.length <= (i = sp & SMASK))
1057                 break;                                // terminated
1058             else if ((v = ws[i]) is null)
1059                 break;                                // terminating
1060             else {
1061                 int np = sp & ~UNSIGNALLED;
1062                 int vp = v.phase;
1063                 long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT));
1064                 Thread vt = v.owner;
1065                 if (sp == vp && AtomicHelper.compareAndSet(this.ctl, c, nc)) {
1066                     v.phase = np;
1067                     if (vt !is null && v.source < 0)
1068                         LockSupport.unpark(vt);
1069                     break;
1070                 }
1071             }
1072         }
1073     }
1074 
1075     /**
1076      * Tries to decrement counts (sometimes implicitly) and possibly
1077      * arrange for a compensating worker in preparation for blocking:
1078      * If not all core workers yet exist, creates one, else if any are
1079      * unreleased (possibly including caller) releases one, else if
1080      * fewer than the minimum allowed number of workers running,
1081      * checks to see that they are all active, and if so creates an
1082      * extra worker unless over maximum limit and policy is to
1083      * saturate.  Most of these steps can fail due to interference, in
1084      * which case 0 is returned so caller will retry. A negative
1085      * return value indicates that the caller doesn't need to
1086      * re-adjust counts when later unblocked.
1087      *
1088      * @return 1: block then adjust, -1: block without adjust, 0 : retry
1089      */
1090     private int tryCompensate(WorkQueue w) {
1091         int t, n, sp;
1092         long c = ctl;
1093         WorkQueue[] ws = workQueues;
1094         if ((t = cast(short)(c >>> TC_SHIFT)) >= 0) {
1095             if (ws is null || (n = cast(int)ws.length) <= 0 || w is null)
1096                 return 0;                        // disabled
1097             else if ((sp = cast(int)c) != 0) {       // replace or release
1098                 WorkQueue v = ws[sp & (n - 1)];
1099                 int wp = w.phase;
1100                 long uc = UC_MASK & ((wp < 0) ? c + RC_UNIT : c);
1101                 int np = sp & ~UNSIGNALLED;
1102                 if (v !is null) {
1103                     int vp = v.phase;
1104                     Thread vt = v.owner;
1105                     long nc = (cast(long)v.stackPred & SP_MASK) | uc;
1106                     if (vp == sp && AtomicHelper.compareAndSet(this.ctl, c, nc)) {
1107                         v.phase = np;
1108                         if (vt !is null && v.source < 0)
1109                             LockSupport.unpark(vt);
1110                         return (wp < 0) ? -1 : 1;
1111                     }
1112                 }
1113                 return 0;
1114             }
1115             else if (cast(int)(c >> RC_SHIFT) -      // reduce parallelism
1116                      cast(short)(bounds & SMASK) > 0) {
1117                 long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c));
1118                 return AtomicHelper.compareAndSet(this.ctl, c, nc) ? 1 : 0;
1119             }
1120             else {                               // validate
1121                 int md = mode, pc = md & SMASK, tc = pc + t, bc = 0;
1122                 bool unstable = false;
1123                 for (int i = 1; i < n; i += 2) {
1124                     WorkQueue q; ThreadEx wt; ThreadState ts;
1125                     if ((q = ws[i]) !is null) {
1126                         if (q.source == 0) {
1127                             unstable = true;
1128                             break;
1129                         }
1130                         else {
1131                             --tc;
1132                             if ((wt = q.owner) !is null &&
1133                                 ((ts = wt.getState()) == ThreadState.BLOCKED ||
1134                                  ts == ThreadState.WAITING))
1135                                 ++bc;            // worker is blocking
1136                         }
1137                     }
1138                 }
1139                 if (unstable || tc != 0 || ctl != c)
1140                     return 0;                    // inconsistent
1141                 else if (t + pc >= MAX_CAP || t >= (bounds >>> SWIDTH)) {
1142                     Predicate!(ForkJoinPool) sat;
1143                     if ((sat = saturate) !is null && sat(this))
1144                         return -1;
1145                     else if (bc < pc) {          // lagging
1146                         Thread.yield();          // for retry spins
1147                         return 0;
1148                     }
1149                     else
1150                         throw new RejectedExecutionException(
1151                             "Thread limit exceeded replacing blocked worker");
1152                 }
1153             }
1154         }
1155 
1156         long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); // expand pool
1157         return AtomicHelper.compareAndSet(this.ctl, c, nc) && createWorker() ? 1 : 0;
1158     }
1159 
1160     /**
1161      * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1162      * See above for explanation.
1163      */
1164     final void runWorker(WorkQueue w) {
1165         int r = (w.id ^ ThreadLocalRandom.nextSecondarySeed()) | FIFO; // rng
1166         w.array = new IForkJoinTask[INITIAL_QUEUE_CAPACITY]; // initialize
1167         while(true) {
1168             int phase;
1169             if (scan(w, r)) {                     // scan until apparently empty
1170                 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // move (xorshift)
1171             }
1172             else if ((phase = w.phase) >= 0) {    // enqueue, then rescan
1173                 long np = (w.phase = (phase + SS_SEQ) | UNSIGNALLED) & SP_MASK;
1174                 long c, nc;
1175                 do {
1176                     w.stackPred = cast(int)(c = ctl);
1177                     nc = ((c - RC_UNIT) & UC_MASK) | np;
1178                 } while (!AtomicHelper.compareAndSet(this.ctl, c, nc));
1179 
1180                 version(HUNT_CONCURRENCY_DEBUG) {
1181                     // infof("ctl=%d, c=%d, nc=%d, stackPred=%d", ctl, c, nc, w.stackPred);
1182                 }
1183             }
1184             else {                                // already queued
1185 
1186                 int pred = w.stackPred;
1187                 ThreadEx.interrupted();           // clear before park
1188                 w.source = DORMANT;               // enable signal
1189                 long c = ctl;
1190                 int md = mode, rc = (md & SMASK) + cast(int)(c >> RC_SHIFT);
1191 
1192                 version(HUNT_CONCURRENCY_DEBUG) {
1193                     // tracef("md=%d, rc=%d, c=%d, pred=%d, phase=%d", md, rc, c, pred, phase);
1194                 }
1195 
1196                 if (md < 0) {                      // terminating
1197                     break;
1198                 } else if (rc <= 0 && (md & SHUTDOWN) != 0 && tryTerminate(false, false)) {
1199                     break;                        // quiescent shutdown
1200                 } else if (rc <= 0 && pred != 0 && phase == cast(int)c) {
1201                     long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred);
1202                     MonoTime d = MonoTime.currTime + keepAlive; // DateTime.currentTimeMillis();
1203                     LockSupport.parkUntil(this, d);
1204                     if (ctl == c &&               // drop on timeout if all idle
1205                         d - MonoTime.currTime <= TIMEOUT_SLOP.msecs &&
1206                         AtomicHelper.compareAndSet(this.ctl, c, nc)) {
1207                         w.phase = QUIET;
1208                         break;
1209                     }
1210                 } else if (w.phase < 0) {
1211                     LockSupport.park(this);       // OK if spuriously woken
1212             break;
1213                 }
1214                 w.source = 0;                     // disable signal
1215             }
1216         }
1217     }
1218 
1219     /**
1220      * Scans for and if found executes one or more top-level tasks from a queue.
1221      *
1222      * @return true if found an apparently non-empty queue, and
1223      * possibly ran task(s).
1224      */
1225     private bool scan(WorkQueue w, int r) {
1226         WorkQueue[] ws; int n;
1227         if ((ws = workQueues) !is null && (n = cast(int)ws.length) > 0 && w !is null) {
1228             for (int m = n - 1, j = r & m;;) {
1229                 WorkQueue q; int b;
1230                 if ((q = ws[j]) !is null && q.top != (b = q.base)) {
1231                     int qid = q.id;
1232                     IForkJoinTask[] a; 
1233                     size_t cap, k; 
1234                     IForkJoinTask t;
1235 
1236                     if ((a = q.array) !is null && (cap = cast(int)a.length) > 0) {
1237                         k = (cap - 1) & b;
1238                         // import core.atomic;  
1239                         // auto ss =     core.atomic.atomicLoad((cast(shared)a[k]));   
1240                         // FIXME: Needing refactor or cleanup -@zxp at 2/6/2019, 5:12:19 PM               
1241                         // IForkJoinTask tt = a[k];
1242                         // t = AtomicHelper.load(tt);
1243                         t = a[k];
1244                         // tracef("k=%d, t is null: %s", k, t is null);
1245                         if (q.base == b++ && t !is null && 
1246                                 AtomicHelper.compareAndSet(a[k], t, cast(IForkJoinTask)null)) {
1247                             q.base = b;
1248                             w.source = qid;
1249                             if (q.top - b > 0) signalWork();
1250 
1251                             // infof("IForkJoinTask: %s", typeid(cast(Object)t));
1252                             w.topLevelExec(t, q,  // random fairness bound
1253                                            r & ((n << TOP_BOUND_SHIFT) - 1));
1254                         }
1255                     }
1256                     return true;
1257                 }
1258                 else if (--n > 0) {
1259                     j = (j + 1) & m;
1260                 }
1261                 else {
1262                     break;
1263                 }
1264             }
1265         }
1266         return false;
1267     }
1268 
1269     /**
1270      * Helps and/or blocks until the given task is done or timeout.
1271      * First tries locally helping, then scans other queues for a task
1272      * produced by one of w's stealers; compensating and blocking if
1273      * none are found (rescanning if tryCompensate fails).
1274      *
1275      * @param w caller
1276      * @param task the task
1277      * @param deadline for timed waits, if nonzero
1278      * @return task status on exit
1279      */
1280     final int awaitJoin(WorkQueue w, IForkJoinTask task, MonoTime deadline) {
1281         if(w is null || task is null) {
1282             return 0;
1283         }
1284         int s = 0;
1285         int seed = ThreadLocalRandom.nextSecondarySeed();
1286         ICountedCompleter cc = cast(ICountedCompleter)task;
1287         if(cc !is null) {
1288             s = w.helpCC(cc, 0, false);
1289             if(s<0)
1290                 return 0;
1291         }
1292 
1293         w.tryRemoveAndExec(task);
1294         int src = w.source, id = w.id;
1295         int r = (seed >>> 16) | 1, step = (seed & ~1) | 2;
1296         s = task.getStatus();
1297         while (s >= 0) {
1298             WorkQueue[] ws;
1299             int n = (ws = workQueues) is null ? 0 : cast(int)ws.length, m = n - 1;
1300             while (n > 0) {
1301                 WorkQueue q; int b;
1302                 if ((q = ws[r & m]) !is null && q.source == id &&
1303                     q.top != (b = q.base)) {
1304                     IForkJoinTask[] a; int cap, k;
1305                     int qid = q.id;
1306                     if ((a = q.array) !is null && (cap = cast(int)a.length) > 0) {
1307                         k = (cap - 1) & b;
1308                         // FIXME: Needing refactor or cleanup -@zxp at 2/6/2019, 5:13:08 PM
1309                         // 
1310                         // IForkJoinTask t = AtomicHelper.load(a[k]);
1311                         IForkJoinTask t = a[k];
1312                         if (q.source == id && q.base == b++ &&
1313                             t !is null && AtomicHelper.compareAndSet(a[k], t, cast(IForkJoinTask)null)) {
1314                             q.base = b;
1315                             w.source = qid;
1316                             t.doExec();
1317                             w.source = src;
1318                         }
1319                     }
1320                     break;
1321                 }
1322                 else {
1323                     r += step;
1324                     --n;
1325                 }
1326             }
1327 
1328             if ((s = task.getStatus()) < 0)
1329                 break;
1330             else if (n == 0) { // empty scan
1331                 long ms; int block;
1332                 Duration ns;
1333                 if (deadline == MonoTime.zero())
1334                     ms = 0L;                       // untimed
1335                 else if ((ns = deadline - MonoTime.currTime) <= Duration.zero())
1336                     break;                         // timeout
1337                 else if ((ms = ns.total!(TimeUnit.Millisecond)()) <= 0L)
1338                     ms = 1L;                       // avoid 0 for timed wait
1339                 if ((block = tryCompensate(w)) != 0) {
1340                     task.internalWait(ms);
1341                     AtomicHelper.getAndAdd(this.ctl, (block > 0) ? RC_UNIT : 0L);
1342                 }
1343                 s = task.getStatus();
1344             }
1345         }
1346         return s;
1347     }
1348 
1349     /**
1350      * Runs tasks until {@code isQuiescent()}. Rather than blocking
1351      * when tasks cannot be found, rescans until all others cannot
1352      * find tasks either.
1353      */
1354     final void helpQuiescePool(WorkQueue w) {
1355         int prevSrc = w.source;
1356         int seed = ThreadLocalRandom.nextSecondarySeed();
1357         int r = seed >>> 16, step = r | 1;
1358         for (int source = prevSrc, released = -1;;) { // -1 until known
1359             IForkJoinTask localTask; WorkQueue[] ws;
1360             while ((localTask = w.nextLocalTask()) !is null)
1361                 localTask.doExec();
1362             if (w.phase >= 0 && released == -1)
1363                 released = 1;
1364             bool quiet = true, empty = true;
1365             int n = (ws = workQueues) is null ? 0 : cast(int)ws.length;
1366             for (int m = n - 1; n > 0; r += step, --n) {
1367                 WorkQueue q; int b;
1368                 if ((q = ws[r & m]) !is null) {
1369                     int qs = q.source;
1370                     if (q.top != (b = q.base)) {
1371                         quiet = empty = false;
1372                         IForkJoinTask[] a; int cap, k;
1373                         int qid = q.id;
1374                         if ((a = q.array) !is null && (cap = cast(int)a.length) > 0) {
1375                             if (released == 0) {    // increment
1376                                 released = 1;
1377                                 AtomicHelper.getAndAdd(this.ctl, RC_UNIT);
1378                             }
1379                             k = (cap - 1) & b;
1380                             // IForkJoinTask t = AtomicHelper.load(a[k]);
1381                             // FIXME: Needing refactor or cleanup -@zxp at 2/6/2019, 9:32:07 PM
1382                             // 
1383                             IForkJoinTask t = a[k];
1384                             if (q.base == b++ && t !is null) {
1385                                 if(AtomicHelper.compareAndSet(a[k], t, null)) {
1386                                     q.base = b;
1387                                     w.source = qid;
1388                                     t.doExec();
1389                                     w.source = source = prevSrc;
1390                                 }
1391                             }
1392                         }
1393                         break;
1394                     }
1395                     else if ((qs & QUIET) == 0)
1396                         quiet = false;
1397                 }
1398             }
1399             if (quiet) {
1400                 if (released == 0) {
1401                     AtomicHelper.getAndAdd(this.ctl, RC_UNIT);
1402                 }
1403                 w.source = prevSrc;
1404                 break;
1405             }
1406             else if (empty) {
1407                 if (source != QUIET)
1408                     w.source = source = QUIET;
1409                 if (released == 1) {                 // decrement
1410                     released = 0;
1411                     AtomicHelper.getAndAdd(this.ctl, RC_MASK & -RC_UNIT);
1412                 }
1413             }
1414         }
1415     }
1416 
1417     /**
1418      * Scans for and returns a polled task, if available.
1419      * Used only for untracked polls.
1420      *
1421      * @param submissionsOnly if true, only scan submission queues
1422      */
1423     private IForkJoinTask pollScan(bool submissionsOnly) {
1424         WorkQueue[] ws; int n;
1425         rescan: while ((mode & STOP) == 0 && (ws = workQueues) !is null &&
1426                       (n = cast(int)ws.length) > 0) {
1427             int m = n - 1;
1428             int r = ThreadLocalRandom.nextSecondarySeed();
1429             int h = r >>> 16;
1430             int origin, step;
1431             if (submissionsOnly) {
1432                 origin = (r & ~1) & m;         // even indices and steps
1433                 step = (h & ~1) | 2;
1434             }
1435             else {
1436                 origin = r & m;
1437                 step = h | 1;
1438             }
1439             bool nonempty = false;
1440             for (int i = origin, oldSum = 0, checkSum = 0;;) {
1441                 WorkQueue q;
1442                 if ((q = ws[i]) !is null) {
1443                     int b; IForkJoinTask t;
1444                     if (q.top - (b = q.base) > 0) {
1445                         nonempty = true;
1446                         if ((t = q.poll()) !is null)
1447                             return t;
1448                     }
1449                     else
1450                         checkSum += b + q.id;
1451                 }
1452                 if ((i = (i + step) & m) == origin) {
1453                     if (!nonempty && oldSum == (oldSum = checkSum))
1454                         break rescan;
1455                     checkSum = 0;
1456                     nonempty = false;
1457                 }
1458             }
1459         }
1460         return null;
1461     }
1462 
1463     /**
1464      * Gets and removes a local or stolen task for the given worker.
1465      *
1466      * @return a task, if available
1467      */
1468     final IForkJoinTask nextTaskFor(WorkQueue w) {
1469         IForkJoinTask t;
1470         if (w is null || (t = w.nextLocalTask()) is null)
1471             t = pollScan(false);
1472         return t;
1473     }
1474 
1475     // External operations
1476 
1477     /**
1478      * Adds the given task to a submission queue at submitter's
1479      * current queue, creating one if null or contended.
1480      *
1481      * @param task the task. Caller must ensure non-null.
1482      */
1483     final void externalPush(IForkJoinTask task) {
1484         // initialize caller's probe
1485         int r = ThreadLocalRandom.getProbe();
1486         if (r == 0) {
1487             ThreadLocalRandom.localInit();
1488             r = ThreadLocalRandom.getProbe();
1489         }
1490 
1491         for (;;) {
1492             WorkQueue q;
1493             int md = mode, n;
1494             WorkQueue[] ws = workQueues;
1495             if ((md & SHUTDOWN) != 0 || ws is null || (n = cast(int)ws.length) <= 0)
1496                 throw new RejectedExecutionException();
1497             else if ((q = ws[(n - 1) & r & SQMASK]) is null) { // add queue
1498                 int qid = (r | QUIET) & ~(FIFO | OWNED);
1499                 Object lock = workerNameLocker;
1500                 IForkJoinTask[] qa =
1501                     new IForkJoinTask[INITIAL_QUEUE_CAPACITY];
1502                 q = new WorkQueue(this, null);
1503                 q.array = qa;
1504                 q.id = qid;
1505                 q.source = QUIET;
1506                 if (lock !is null) {     // unless disabled, lock pool to install
1507                     synchronized (lock) {
1508                         WorkQueue[] vs; int i, vn;
1509                         if ((vs = workQueues) !is null && (vn = cast(int)vs.length) > 0 &&
1510                             vs[i = qid & (vn - 1) & SQMASK] is null)
1511                             vs[i] = q;  // else another thread already installed
1512                     }
1513                 }
1514             }
1515             else if (!q.tryLockPhase()) // move if busy
1516                 r = ThreadLocalRandom.advanceProbe(r);
1517             else {
1518                 if (q.lockedPush(task))
1519                     signalWork();
1520                 return;
1521             }
1522         }
1523     }
1524 
1525     /**
1526      * Pushes a possibly-external submission.
1527      */
1528     private IForkJoinTask externalSubmit(IForkJoinTask task) {
1529         if (task is null)
1530             throw new NullPointerException();
1531         ForkJoinWorkerThread w = cast(ForkJoinWorkerThread)Thread.getThis(); 
1532         WorkQueue q;
1533         if ( w !is null && w.pool is this &&
1534             (q = w.workQueue) !is null)
1535             q.push(task);
1536         else
1537             externalPush(task);
1538         return task;
1539     }
1540 
1541     private ForkJoinTask!(T) externalSubmit(T)(ForkJoinTask!(T) task) {
1542         if (task is null)
1543             throw new NullPointerException();
1544         ForkJoinWorkerThread w = cast(ForkJoinWorkerThread)Thread.getThis(); 
1545         WorkQueue q;
1546         if ( w !is null && w.pool is this &&
1547             (q = w.workQueue) !is null)
1548             q.push(task);
1549         else
1550             externalPush(task);
1551         return task;
1552     }
1553 
1554     /**
1555      * Returns common pool queue for an external thread.
1556      */
1557     static WorkQueue commonSubmitterQueue() {
1558         ForkJoinPool p = common;
1559         int r = ThreadLocalRandom.getProbe();
1560         WorkQueue[] ws; int n;
1561         return (p !is null && (ws = p.workQueues) !is null &&
1562                 (n = cast(int)ws.length) > 0) ?
1563             ws[(n - 1) & r & SQMASK] : null;
1564     }
1565 
1566     /**
1567      * Performs tryUnpush for an external submitter.
1568      */
1569     final bool tryExternalUnpush(IForkJoinTask task) {
1570         int r = ThreadLocalRandom.getProbe();
1571         WorkQueue[] ws; WorkQueue w; int n;
1572         return ((ws = workQueues) !is null &&
1573                 (n = cast(int)ws.length) > 0 &&
1574                 (w = ws[(n - 1) & r & SQMASK]) !is null &&
1575                 w.tryLockedUnpush(task));
1576     }
1577 
1578     /**
1579      * Performs helpComplete for an external submitter.
1580      */
1581     final int externalHelpComplete(ICountedCompleter task, int maxTasks) {
1582         int r = ThreadLocalRandom.getProbe();
1583         WorkQueue[] ws; WorkQueue w; int n;
1584         return ((ws = workQueues) !is null && (n = cast(int)ws.length) > 0 &&
1585                 (w = ws[(n - 1) & r & SQMASK]) !is null) ?
1586             w.helpCC(task, maxTasks, true) : 0;
1587     }
1588 
1589     /**
1590      * Tries to steal and run tasks within the target's computation.
1591      * The maxTasks argument supports external usages; internal calls
1592      * use zero, allowing unbounded steps (external calls trap
1593      * non-positive values).
1594      *
1595      * @param w caller
1596      * @param maxTasks if non-zero, the maximum number of other tasks to run
1597      * @return task status on exit
1598      */
1599     final int helpComplete(WorkQueue w, ICountedCompleter task,
1600                            int maxTasks) {
1601         return (w is null) ? 0 : w.helpCC(task, maxTasks, false);
1602     }
1603 
1604     /**
1605      * Returns a cheap heuristic guide for task partitioning when
1606      * programmers, frameworks, tools, or languages have little or no
1607      * idea about task granularity.  In essence, by offering this
1608      * method, we ask users only about tradeoffs in overhead vs
1609      * expected throughput and its variance, rather than how finely to
1610      * partition tasks.
1611      *
1612      * In a steady state strict (tree-structured) computation, each
1613      * thread makes available for stealing enough tasks for other
1614      * threads to remain active. Inductively, if all threads play by
1615      * the same rules, each thread should make available only a
1616      * constant number of tasks.
1617      *
1618      * The minimum useful constant is just 1. But using a value of 1
1619      * would require immediate replenishment upon each steal to
1620      * maintain enough tasks, which is infeasible.  Further,
1621      * partitionings/granularities of offered tasks should minimize
1622      * steal rates, which in general means that threads nearer the top
1623      * of computation tree should generate more than those nearer the
1624      * bottom. In perfect steady state, each thread is at
1625      * approximately the same level of computation tree. However,
1626      * producing extra tasks amortizes the uncertainty of progress and
1627      * diffusion assumptions.
1628      *
1629      * So, users will want to use values larger (but not much larger)
1630      * than 1 to both smooth over shortages and hedge
1631      * against uneven progress; as traded off against the cost of
1632      * extra task overhead. We leave the user to pick a threshold
1633      * value to compare with the results of this call to guide
1634      * decisions, but recommend values such as 3.
1635      *
1636      * When all threads are active, it is on average OK to estimate
1637      * surplus strictly locally. In steady-state, if one thread is
1638      * maintaining say 2 surplus tasks, then so are others. So we can
1639      * just use estimated queue length.  However, this strategy alone
1640      * leads to serious mis-estimates in some non-steady-state
1641      * conditions (ramp-up, ramp-down, other stalls). We can detect
1642      * many of these by further considering the number of "idle"
1643      * threads, that are known to have zero queued tasks, so
1644      * compensate by a factor of (#idle/#active) threads.
1645      */
1646     static int getSurplusQueuedTaskCount() {
1647         Thread t = Thread.getThis(); 
1648         ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)t; 
1649         ForkJoinPool pool; WorkQueue q;
1650 
1651         if (wt !is null && (pool = wt.pool) !is null &&
1652             (q = wt.workQueue) !is null) {
1653             int p = pool.mode & SMASK;
1654             int a = p + cast(int)(pool.ctl >> RC_SHIFT);
1655             int n = q.top - q.base;
1656             return n - (a > (p >>>= 1) ? 0 :
1657                         a > (p >>>= 1) ? 1 :
1658                         a > (p >>>= 1) ? 2 :
1659                         a > (p >>>= 1) ? 4 :
1660                         8);
1661         }
1662         return 0;
1663     }
1664 
1665     // Termination
1666 
1667     /**
1668      * Possibly initiates and/or completes termination.
1669      *
1670      * @param now if true, unconditionally terminate, else only
1671      * if no work and no active workers
1672      * @param enable if true, terminate when next possible
1673      * @return true if terminating or terminated
1674      */
1675     private bool tryTerminate(bool now, bool enable) {
1676         int md; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED
1677 
1678         while (((md = mode) & SHUTDOWN) == 0) {
1679             if (!enable || this == common)        // cannot shutdown
1680                 return false;
1681             else {
1682                 AtomicHelper.compareAndSet(this.mode, md, md | SHUTDOWN);
1683             }
1684                 
1685         }
1686 
1687         while (((md = mode) & STOP) == 0) {       // try to initiate termination
1688             if (!now) {                           // check if quiescent & empty
1689                 for (long oldSum = 0L;;) {        // repeat until stable
1690                     bool running = false;
1691                     long checkSum = ctl;
1692                     WorkQueue[] ws = workQueues;
1693                     if ((md & SMASK) + cast(int)(checkSum >> RC_SHIFT) > 0)
1694                         running = true;
1695                     else if (ws !is null) {
1696                         WorkQueue w;
1697                         for (int i = 0; i < ws.length; ++i) {
1698                             if ((w = ws[i]) !is null) {
1699                                 int s = w.source, p = w.phase;
1700                                 int d = w.id, b = w.base;
1701                                 if (b != w.top ||
1702                                     ((d & 1) == 1 && (s >= 0 || p >= 0))) {
1703                                     running = true;
1704                                     break;     // working, scanning, or have work
1705                                 }
1706                                 checkSum += ((cast(long)s << 48) + (cast(long)p << 32) +
1707                                              (cast(long)b << 16) + cast(long)d);
1708                             }
1709                         }
1710                     }
1711                     if (((md = mode) & STOP) != 0)
1712                         break;                 // already triggered
1713                     else if (running)
1714                         return false;
1715                     else if (workQueues == ws && oldSum == (oldSum = checkSum))
1716                         break;
1717                 }
1718             }
1719             if ((md & STOP) == 0)
1720                 AtomicHelper.compareAndSet(this.mode, md, md | STOP);
1721         }
1722 
1723         while (((md = mode) & TERMINATED) == 0) { // help terminate others
1724             for (long oldSum = 0L;;) {            // repeat until stable
1725                 WorkQueue[] ws; WorkQueue w;
1726                 long checkSum = ctl;
1727                 if ((ws = workQueues) !is null) {
1728                     for (int i = 0; i < ws.length; ++i) {
1729                         if ((w = ws[i]) !is null) {
1730                             ForkJoinWorkerThread wt = w.owner;
1731                             w.cancelAll();        // clear queues
1732                             if (wt !is null) {
1733                                 try {             // unblock join or park
1734                                     wt.interrupt();
1735                                 } catch (Throwable ignore) {
1736                                 }
1737                             }
1738                             checkSum += (cast(long)w.phase << 32) + w.base;
1739                         }
1740                     }
1741                 }
1742                 if (((md = mode) & TERMINATED) != 0 ||
1743                     (workQueues == ws && oldSum == (oldSum = checkSum)))
1744                     break;
1745             }
1746             if ((md & TERMINATED) != 0)
1747                 break;
1748             else if ((md & SMASK) + cast(short)(ctl >>> TC_SHIFT) > 0)
1749                 break;
1750             else if (AtomicHelper.compareAndSet(this.mode, md, md | TERMINATED)) {
1751                 synchronized (this) {
1752                     // notifyAll();                  // for awaitTermination
1753                     // TODO: Tasks pending completion -@zxp at 2/4/2019, 11:03:21 AM
1754                     // 
1755                 }
1756                 break;
1757             }
1758         }
1759         return true;
1760     }
1761 
1762     // Exported methods
1763 
1764     // Constructors
1765 
1766     /**
1767      * Creates a {@code ForkJoinPool} with parallelism equal to {@link
1768      * java.lang.Runtime#availableProcessors}, using defaults for all
1769      * other parameters (see {@link #ForkJoinPool(int,
1770      * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, bool,
1771      * int, int, int, Predicate, long, TimeUnit)}).
1772      *
1773      * @throws SecurityException if a security manager exists and
1774      *         the caller is not permitted to modify threads
1775      *         because it does not hold {@link
1776      *         java.lang.RuntimePermission}{@code ("modifyThread")}
1777      */
1778     this() {
1779         this(min(MAX_CAP, totalCPUs),
1780              defaultForkJoinWorkerThreadFactory, null, false,
1781              0, MAX_CAP, 1, null, dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE));
1782     }
1783 
1784     /**
1785      * Creates a {@code ForkJoinPool} with the indicated parallelism
1786      * level, using defaults for all other parameters (see {@link
1787      * #ForkJoinPool(int, ForkJoinWorkerThreadFactory,
1788      * UncaughtExceptionHandler, bool, int, int, int, Predicate,
1789      * long, TimeUnit)}).
1790      *
1791      * @param parallelism the parallelism level
1792      * @throws IllegalArgumentException if parallelism less than or
1793      *         equal to zero, or greater than implementation limit
1794      * @throws SecurityException if a security manager exists and
1795      *         the caller is not permitted to modify threads
1796      *         because it does not hold {@link
1797      *         java.lang.RuntimePermission}{@code ("modifyThread")}
1798      */
1799     this(int parallelism) {
1800         this(parallelism, defaultForkJoinWorkerThreadFactory, null, false,
1801              0, MAX_CAP, 1, null, dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE));
1802     }
1803 
1804     /**
1805      * Creates a {@code ForkJoinPool} with the given parameters (using
1806      * defaults for others -- see {@link #ForkJoinPool(int,
1807      * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, bool,
1808      * int, int, int, Predicate, long, TimeUnit)}).
1809      *
1810      * @param parallelism the parallelism level. For default value,
1811      * use {@link java.lang.Runtime#availableProcessors}.
1812      * @param factory the factory for creating new threads. For default value,
1813      * use {@link #defaultForkJoinWorkerThreadFactory}.
1814      * @param handler the handler for internal worker threads that
1815      * terminate due to unrecoverable errors encountered while executing
1816      * tasks. For default value, use {@code null}.
1817      * @param asyncMode if true,
1818      * establishes local first-in-first-out scheduling mode for forked
1819      * tasks that are never joined. This mode may be more appropriate
1820      * than default locally stack-based mode in applications in which
1821      * worker threads only process event-style asynchronous tasks.
1822      * For default value, use {@code false}.
1823      * @throws IllegalArgumentException if parallelism less than or
1824      *         equal to zero, or greater than implementation limit
1825      * @throws NullPointerException if the factory is null
1826      * @throws SecurityException if a security manager exists and
1827      *         the caller is not permitted to modify threads
1828      *         because it does not hold {@link
1829      *         java.lang.RuntimePermission}{@code ("modifyThread")}
1830      */
1831     this(int parallelism,
1832                         ForkJoinWorkerThreadFactory factory,
1833                         UncaughtExceptionHandler handler,
1834                         bool asyncMode) {
1835         this(parallelism, factory, handler, asyncMode,
1836              0, MAX_CAP, 1, null, dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE));
1837     }
1838 
1839     /**
1840      * Creates a {@code ForkJoinPool} with the given parameters.
1841      *
1842      * @param parallelism the parallelism level. For default value,
1843      * use {@link java.lang.Runtime#availableProcessors}.
1844      *
1845      * @param factory the factory for creating new threads. For
1846      * default value, use {@link #defaultForkJoinWorkerThreadFactory}.
1847      *
1848      * @param handler the handler for internal worker threads that
1849      * terminate due to unrecoverable errors encountered while
1850      * executing tasks. For default value, use {@code null}.
1851      *
1852      * @param asyncMode if true, establishes local first-in-first-out
1853      * scheduling mode for forked tasks that are never joined. This
1854      * mode may be more appropriate than default locally stack-based
1855      * mode in applications in which worker threads only process
1856      * event-style asynchronous tasks.  For default value, use {@code
1857      * false}.
1858      *
1859      * @param corePoolSize the number of threads to keep in the pool
1860      * (unless timed out after an elapsed keep-alive). Normally (and
1861      * by default) this is the same value as the parallelism level,
1862      * but may be set to a larger value to reduce dynamic overhead if
1863      * tasks regularly block. Using a smaller value (for example
1864      * {@code 0}) has the same effect as the default.
1865      *
1866      * @param maximumPoolSize the maximum number of threads allowed.
1867      * When the maximum is reached, attempts to replace blocked
1868      * threads fail.  (However, because creation and termination of
1869      * different threads may overlap, and may be managed by the given
1870      * thread factory, this value may be transiently exceeded.)  To
1871      * arrange the same value as is used by default for the common
1872      * pool, use {@code 256} plus the {@code parallelism} level. (By
1873      * default, the common pool allows a maximum of 256 spare
1874      * threads.)  Using a value (for example {@code
1875      * Integer.MAX_VALUE}) larger than the implementation's total
1876      * thread limit has the same effect as using this limit (which is
1877      * the default).
1878      *
1879      * @param minimumRunnable the minimum allowed number of core
1880      * threads not blocked by a join or {@link ManagedBlocker}.  To
1881      * ensure progress, when too few unblocked threads exist and
1882      * unexecuted tasks may exist, new threads are constructed, up to
1883      * the given maximumPoolSize.  For the default value, use {@code
1884      * 1}, that ensures liveness.  A larger value might improve
1885      * throughput in the presence of blocked activities, but might
1886      * not, due to increased overhead.  A value of zero may be
1887      * acceptable when submitted tasks cannot have dependencies
1888      * requiring additional threads.
1889      *
1890      * @param saturate if non-null, a predicate invoked upon attempts
1891      * to create more than the maximum total allowed threads.  By
1892      * default, when a thread is about to block on a join or {@link
1893      * ManagedBlocker}, but cannot be replaced because the
1894      * maximumPoolSize would be exceeded, a {@link
1895      * RejectedExecutionException} is thrown.  But if this predicate
1896      * returns {@code true}, then no exception is thrown, so the pool
1897      * continues to operate with fewer than the target number of
1898      * runnable threads, which might not ensure progress.
1899      *
1900      * @param keepAliveTime the elapsed time since last use before
1901      * a thread is terminated (and then later replaced if needed).
1902      * For the default value, use {@code 60, TimeUnit.SECONDS}.
1903      *
1904      * @param unit the time unit for the {@code keepAliveTime} argument
1905      *
1906      * @throws IllegalArgumentException if parallelism is less than or
1907      *         equal to zero, or is greater than implementation limit,
1908      *         or if maximumPoolSize is less than parallelism,
1909      *         of if the keepAliveTime is less than or equal to zero.
1910      * @throws NullPointerException if the factory is null
1911      * @throws SecurityException if a security manager exists and
1912      *         the caller is not permitted to modify threads
1913      *         because it does not hold {@link
1914      *         java.lang.RuntimePermission}{@code ("modifyThread")}
1915      */
1916     this(int parallelism,
1917                         ForkJoinWorkerThreadFactory factory,
1918                         UncaughtExceptionHandler handler,
1919                         bool asyncMode,
1920                         int corePoolSize,
1921                         int maximumPoolSize,
1922                         int minimumRunnable,
1923                         Predicate!(ForkJoinPool) saturate,
1924                         Duration keepAliveTime) {
1925         // check, encode, pack parameters
1926         if (parallelism <= 0 || parallelism > MAX_CAP ||
1927             maximumPoolSize < parallelism || keepAliveTime <= Duration.zero)
1928             throw new IllegalArgumentException();
1929         if (factory is null)
1930             throw new NullPointerException();
1931         long ms = max(keepAliveTime.total!(TimeUnit.Millisecond), TIMEOUT_SLOP);
1932 
1933         int corep = min(max(corePoolSize, parallelism), MAX_CAP);
1934         long c = (((cast(long)(-corep)       << TC_SHIFT) & TC_MASK) |
1935                   ((cast(long)(-parallelism) << RC_SHIFT) & RC_MASK));
1936         int m = parallelism | (asyncMode ? FIFO : 0);
1937         int maxSpares = min(maximumPoolSize, MAX_CAP) - parallelism;
1938         int minAvail = min(max(minimumRunnable, 0), MAX_CAP);
1939         int b = ((minAvail - parallelism) & SMASK) | (maxSpares << SWIDTH);
1940         int n = (parallelism > 1) ? parallelism - 1 : 1; // at least 2 slots
1941         n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
1942         n = (n + 1) << 1; // power of two, including space for submission queues
1943 
1944         this.workerNamePrefix = "ForkJoinPool-" ~ nextPoolId().to!string() ~ "-worker-";
1945         this.workQueues = new WorkQueue[n];
1946         this.factory = factory;
1947         this.ueh = handler;
1948         this.saturate = saturate;
1949         this.keepAlive = ms.msecs;
1950         this.bounds = b;
1951         this.mode = m;
1952         this.ctl = c;
1953         // checkPermission();
1954         workerNameLocker = new Object(); 
1955     }
1956 
1957     private static Object newInstanceFrom(string className) {
1958         return (className.empty) ? null : Object.factory(className);
1959     }
1960 
1961     /**
1962      * Constructor for common pool using parameters possibly
1963      * overridden by system properties
1964      */
1965     private this(byte forCommonPoolOnly) {
1966         int parallelism = -1;
1967         ForkJoinWorkerThreadFactory fac = null;
1968         UncaughtExceptionHandler handler = null;
1969         try {  // ignore exceptions in accessing/parsing properties
1970             ConfigBuilder config = Environment.getProperties();
1971             string pp = config.getProperty("hunt.concurrency.ForkJoinPool.common.parallelism");
1972             if (!pp.empty) {
1973                 parallelism = pp.to!int();
1974             }
1975 
1976             string className = config.getProperty("hunt.concurrency.ForkJoinPool.common.threadFactory");
1977             fac = cast(ForkJoinWorkerThreadFactory) newInstanceFrom(className);
1978 
1979             className = config.getProperty("hunt.concurrency.ForkJoinPool.common.exceptionHandler");
1980             handler = cast(UncaughtExceptionHandler) newInstanceFrom(className);
1981         } catch (Exception ignore) {
1982             version(HUNT_DEBUG) {
1983                 warning(ignore);
1984             }
1985         }
1986 
1987         if (fac is null) {
1988             // if (System.getSecurityManager() is null)
1989                 fac = defaultForkJoinWorkerThreadFactory;
1990             // else // use security-managed default
1991             //     fac = new InnocuousForkJoinWorkerThreadFactory();
1992         }
1993         if (parallelism < 0 && // default 1 less than #cores
1994             (parallelism = totalCPUs - 1) <= 0)
1995             parallelism = 1;
1996         if (parallelism > MAX_CAP)
1997             parallelism = MAX_CAP;
1998 
1999         long c = (((cast(long)(-parallelism) << TC_SHIFT) & TC_MASK) |
2000                   ((cast(long)(-parallelism) << RC_SHIFT) & RC_MASK));
2001         int b = ((1 - parallelism) & SMASK) | (COMMON_MAX_SPARES << SWIDTH);
2002         int n = (parallelism > 1) ? parallelism - 1 : 1;
2003         n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
2004         n = (n + 1) << 1;
2005 
2006         this.workerNamePrefix = "ForkJoinPool.commonPool-worker-";
2007         this.workerNameLocker = new Object(); 
2008         this.workQueues = new WorkQueue[n];
2009         this.factory = fac;
2010         this.ueh = handler;
2011         this.saturate = null;
2012         this.keepAlive = DEFAULT_KEEPALIVE.msecs;
2013         this.bounds = b;
2014         this.mode = parallelism;
2015         this.ctl = c;
2016     }
2017 
2018     /**
2019      * Returns the common pool instance. This pool is statically
2020      * constructed; its run state is unaffected by attempts to {@link
2021      * #shutdown} or {@link #shutdownNow}. However this pool and any
2022      * ongoing processing are automatically terminated upon program
2023      * {@link System#exit}.  Any program that relies on asynchronous
2024      * task processing to complete before program termination should
2025      * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2026      * before exit.
2027      *
2028      * @return the common pool instance
2029      */
2030     static ForkJoinPool commonPool() {
2031         return common;
2032     }
2033 
2034     // Execution methods
2035 
2036     /**
2037      * Performs the given task, returning its result upon completion.
2038      * If the computation encounters an unchecked Exception or Error,
2039      * it is rethrown as the outcome of this invocation.  Rethrown
2040      * exceptions behave in the same way as regular exceptions, but,
2041      * when possible, contain stack traces (as displayed for example
2042      * using {@code ex.printStackTrace()}) of both the current thread
2043      * as well as the thread actually encountering the exception;
2044      * minimally only the latter.
2045      *
2046      * @param task the task
2047      * @param (T) the type of the task's result
2048      * @return the task's result
2049      * @throws NullPointerException if the task is null
2050      * @throws RejectedExecutionException if the task cannot be
2051      *         scheduled for execution
2052      */
2053     T invoke(T)(ForkJoinTask!(T) task) {
2054         if (task is null)
2055             throw new NullPointerException();
2056         externalSubmit!(T)(task);
2057         version(HUNT_CONCURRENCY_DEBUG) {
2058             infof("waiting the result...");
2059             T c = task.join();
2060             infof("final result: %s", c);
2061             return c;
2062         } else {
2063             return task.join();
2064         }
2065     }
2066 
2067     /**
2068      * Arranges for (asynchronous) execution of the given task.
2069      *
2070      * @param task the task
2071      * @throws NullPointerException if the task is null
2072      * @throws RejectedExecutionException if the task cannot be
2073      *         scheduled for execution
2074      */
2075     void execute(IForkJoinTask task) {
2076         externalSubmit(task);
2077     }
2078 
2079     // AbstractExecutorService methods
2080 
2081     /**
2082      * @throws NullPointerException if the task is null
2083      * @throws RejectedExecutionException if the task cannot be
2084      *         scheduled for execution
2085      */
2086     void execute(Runnable task) {
2087         if (task is null)
2088             throw new NullPointerException();
2089         IForkJoinTask job = cast(IForkJoinTask) task;
2090         if (job is null) { // avoid re-wrap
2091             warning("job is null");
2092             job = new RunnableExecuteAction(task);
2093         }
2094         externalSubmit(job);
2095     }
2096 
2097     /**
2098      * Submits a ForkJoinTask for execution.
2099      *
2100      * @param task the task to submit
2101      * @param (T) the type of the task's result
2102      * @return the task
2103      * @throws NullPointerException if the task is null
2104      * @throws RejectedExecutionException if the task cannot be
2105      *         scheduled for execution
2106      */
2107     ForkJoinTask!(T) submitTask(T)(ForkJoinTask!(T) task) {
2108         return externalSubmit(task);
2109     }
2110 
2111     /**
2112      * @throws NullPointerException if the task is null
2113      * @throws RejectedExecutionException if the task cannot be
2114      *         scheduled for execution
2115      */
2116     ForkJoinTask!(T) submitTask(T)(Callable!(T) task) {
2117         return externalSubmit(new AdaptedCallable!(T)(task));
2118     }
2119 
2120     /**
2121      * @throws NullPointerException if the task is null
2122      * @throws RejectedExecutionException if the task cannot be
2123      *         scheduled for execution
2124      */
2125     ForkJoinTask!(T) submitTask(T)(Runnable task, T result) {
2126         return externalSubmit(new AdaptedRunnable!(T)(task, result));
2127     }
2128 
2129     /**
2130      * @throws NullPointerException if the task is null
2131      * @throws RejectedExecutionException if the task cannot be
2132      *         scheduled for execution
2133      */
2134 
2135     IForkJoinTask submitTask(Runnable task) {
2136         if (task is null)
2137             throw new NullPointerException();
2138         IForkJoinTask t = cast(IForkJoinTask)task;
2139         return externalSubmit(t !is null
2140             ? cast(ForkJoinTask!(void)) task // avoid re-wrap
2141             : new AdaptedRunnableAction(task));
2142     }
2143 
2144     /**
2145      * @throws NullPointerException       {@inheritDoc}
2146      * @throws RejectedExecutionException {@inheritDoc}
2147      */
2148     List!(Future!(T)) invokeAll(T)(Collection!(Callable!(T)) tasks) {
2149         // In previous versions of this class, this method constructed
2150         // a task to run ForkJoinTask.invokeAll, but now external
2151         // invocation of multiple tasks is at least as efficient.
2152         ArrayList!(Future!(T)) futures = new ArrayList!(Future!(T))(tasks.size());
2153 
2154         try {
2155             foreach (Callable!(T) t ; tasks) {
2156                 ForkJoinTask!(T) f = new AdaptedCallable!(T)(t);
2157                 futures.add(f);
2158                 externalSubmit(f);
2159             }
2160             for (int i = 0, size = futures.size(); i < size; i++)
2161                 (cast(IForkJoinTask)(futures.get(i))).quietlyJoin();
2162             return futures;
2163         } catch (Throwable t) {
2164             for (int i = 0, size = futures.size(); i < size; i++)
2165                 futures.get(i).cancel(false);
2166             throw t;
2167         }
2168     }
2169 
2170     /**
2171      * Returns the factory used for constructing new workers.
2172      *
2173      * @return the factory used for constructing new workers
2174      */
2175     ForkJoinWorkerThreadFactory getFactory() {
2176         return factory;
2177     }
2178 
2179     /**
2180      * Returns the handler for internal worker threads that terminate
2181      * due to unrecoverable errors encountered while executing tasks.
2182      *
2183      * @return the handler, or {@code null} if none
2184      */
2185     UncaughtExceptionHandler getUncaughtExceptionHandler() {
2186         return ueh;
2187     }
2188 
2189     /**
2190      * Returns the targeted parallelism level of this pool.
2191      *
2192      * @return the targeted parallelism level of this pool
2193      */
2194     int getParallelism() {
2195         int par = mode & SMASK;
2196         return (par > 0) ? par : 1;
2197     }
2198 
2199     /**
2200      * Returns the targeted parallelism level of the common pool.
2201      *
2202      * @return the targeted parallelism level of the common pool
2203      */
2204     static int getCommonPoolParallelism() {
2205         return COMMON_PARALLELISM;
2206     }
2207 
2208     /**
2209      * Returns the number of worker threads that have started but not
2210      * yet terminated.  The result returned by this method may differ
2211      * from {@link #getParallelism} when threads are created to
2212      * maintain parallelism when others are cooperatively blocked.
2213      *
2214      * @return the number of worker threads
2215      */
2216     int getPoolSize() {
2217         return ((mode & SMASK) + cast(short)(ctl >>> TC_SHIFT));
2218     }
2219 
2220     /**
2221      * Returns {@code true} if this pool uses local first-in-first-out
2222      * scheduling mode for forked tasks that are never joined.
2223      *
2224      * @return {@code true} if this pool uses async mode
2225      */
2226     bool getAsyncMode() {
2227         return (mode & FIFO) != 0;
2228     }
2229 
2230     /**
2231      * Returns an estimate of the number of worker threads that are
2232      * not blocked waiting to join tasks or for other managed
2233      * synchronization. This method may overestimate the
2234      * number of running threads.
2235      *
2236      * @return the number of worker threads
2237      */
2238     int getRunningThreadCount() {
2239         WorkQueue[] ws; WorkQueue w;
2240         // VarHandle.acquireFence();
2241         int rc = 0;
2242         if ((ws = workQueues) !is null) {
2243             for (int i = 1; i < cast(int)ws.length; i += 2) {
2244                 if ((w = ws[i]) !is null && w.isApparentlyUnblocked())
2245                     ++rc;
2246             }
2247         }
2248         return rc;
2249     }
2250 
2251     /**
2252      * Returns an estimate of the number of threads that are currently
2253      * stealing or executing tasks. This method may overestimate the
2254      * number of active threads.
2255      *
2256      * @return the number of active threads
2257      */
2258     int getActiveThreadCount() {
2259         int r = (mode & SMASK) + cast(int)(ctl >> RC_SHIFT);
2260         return (r <= 0) ? 0 : r; // suppress momentarily negative values
2261     }
2262 
2263     /**
2264      * Returns {@code true} if all worker threads are currently idle.
2265      * An idle worker is one that cannot obtain a task to execute
2266      * because none are available to steal from other threads, and
2267      * there are no pending submissions to the pool. This method is
2268      * conservative; it might not return {@code true} immediately upon
2269      * idleness of all threads, but will eventually become true if
2270      * threads remain inactive.
2271      *
2272      * @return {@code true} if all threads are currently idle
2273      */
2274     bool isQuiescent() {
2275         for (;;) {
2276             long c = ctl;
2277             int md = mode, pc = md & SMASK;
2278             int tc = pc + cast(short)(c >>> TC_SHIFT);
2279             int rc = pc + cast(int)(c >> RC_SHIFT);
2280             if ((md & (STOP | TERMINATED)) != 0)
2281                 return true;
2282             else if (rc > 0)
2283                 return false;
2284             else {
2285                 WorkQueue[] ws; WorkQueue v;
2286                 if ((ws = workQueues) !is null) {
2287                     for (int i = 1; i < ws.length; i += 2) {
2288                         if ((v = ws[i]) !is null) {
2289                             if (v.source > 0)
2290                                 return false;
2291                             --tc;
2292                         }
2293                     }
2294                 }
2295                 if (tc == 0 && ctl == c)
2296                     return true;
2297             }
2298         }
2299     }
2300 
2301     /**
2302      * Returns an estimate of the total number of tasks stolen from
2303      * one thread's work queue by another. The reported value
2304      * underestimates the actual total number of steals when the pool
2305      * is not quiescent. This value may be useful for monitoring and
2306      * tuning fork/join programs: in general, steal counts should be
2307      * high enough to keep threads busy, but low enough to avoid
2308      * overhead and contention across threads.
2309      *
2310      * @return the number of steals
2311      */
2312     long getStealCount() {
2313         long count = stealCount;
2314         WorkQueue[] ws; WorkQueue w;
2315         if ((ws = workQueues) !is null) {
2316             for (int i = 1; i < ws.length; i += 2) {
2317                 if ((w = ws[i]) !is null)
2318                     count += cast(long)w.nsteals & 0xffffffffL;
2319             }
2320         }
2321         return count;
2322     }
2323 
2324     /**
2325      * Returns an estimate of the total number of tasks currently held
2326      * in queues by worker threads (but not including tasks submitted
2327      * to the pool that have not begun executing). This value is only
2328      * an approximation, obtained by iterating across all threads in
2329      * the pool. This method may be useful for tuning task
2330      * granularities.
2331      *
2332      * @return the number of queued tasks
2333      */
2334     long getQueuedTaskCount() {
2335         WorkQueue[] ws; WorkQueue w;
2336         // VarHandle.acquireFence();
2337         int count = 0;
2338         if ((ws = workQueues) !is null) {
2339             for (int i = 1; i < cast(int)ws.length; i += 2) {
2340                 if ((w = ws[i]) !is null)
2341                     count += w.queueSize();
2342             }
2343         }
2344         return count;
2345     }
2346 
2347     /**
2348      * Returns an estimate of the number of tasks submitted to this
2349      * pool that have not yet begun executing.  This method may take
2350      * time proportional to the number of submissions.
2351      *
2352      * @return the number of queued submissions
2353      */
2354     int getQueuedSubmissionCount() {
2355         WorkQueue[] ws; WorkQueue w;
2356         // VarHandle.acquireFence();
2357         int count = 0;
2358         if ((ws = workQueues) !is null) {
2359             for (int i = 0; i < cast(int)ws.length; i += 2) {
2360                 if ((w = ws[i]) !is null)
2361                     count += w.queueSize();
2362             }
2363         }
2364         return count;
2365     }
2366 
2367     /**
2368      * Returns {@code true} if there are any tasks submitted to this
2369      * pool that have not yet begun executing.
2370      *
2371      * @return {@code true} if there are any queued submissions
2372      */
2373     bool hasQueuedSubmissions() {
2374         WorkQueue[] ws; WorkQueue w;
2375         // VarHandle.acquireFence();
2376         if ((ws = workQueues) !is null) {
2377             for (int i = 0; i < cast(int)ws.length; i += 2) {
2378                 if ((w = ws[i]) !is null && !w.isEmpty())
2379                     return true;
2380             }
2381         }
2382         return false;
2383     }
2384 
2385     /**
2386      * Removes and returns the next unexecuted submission if one is
2387      * available.  This method may be useful in extensions to this
2388      * class that re-assign work in systems with multiple pools.
2389      *
2390      * @return the next submission, or {@code null} if none
2391      */
2392     protected IForkJoinTask pollSubmission() {
2393         return pollScan(true);
2394     }
2395 
2396     /**
2397      * Removes all available unexecuted submitted and forked tasks
2398      * from scheduling queues and adds them to the given collection,
2399      * without altering their execution status. These may include
2400      * artificially generated or wrapped tasks. This method is
2401      * designed to be invoked only when the pool is known to be
2402      * quiescent. Invocations at other times may not remove all
2403      * tasks. A failure encountered while attempting to add elements
2404      * to collection {@code c} may result in elements being in
2405      * neither, either or both collections when the associated
2406      * exception is thrown.  The behavior of this operation is
2407      * undefined if the specified collection is modified while the
2408      * operation is in progress.
2409      *
2410      * @param c the collection to transfer elements into
2411      * @return the number of elements transferred
2412      */
2413     protected int drainTasksTo(Collection!IForkJoinTask c) {
2414         WorkQueue[] ws; WorkQueue w; IForkJoinTask t;
2415         // VarHandle.acquireFence();
2416         int count = 0;
2417         if ((ws = workQueues) !is null) {
2418             for (int i = 0; i < ws.length; ++i) {
2419                 if ((w = ws[i]) !is null) {
2420                     while ((t = w.poll()) !is null) {
2421                         c.add(t);
2422                         ++count;
2423                     }
2424                 }
2425             }
2426         }
2427         return count;
2428     }
2429 
2430     /**
2431      * Returns a string identifying this pool, as well as its state,
2432      * including indications of run state, parallelism level, and
2433      * worker and task counts.
2434      *
2435      * @return a string identifying this pool, as well as its state
2436      */
2437     override string toString() {
2438         // Use a single pass through workQueues to collect counts
2439         int md = mode; // read fields first
2440         long c = ctl;
2441         long st = stealCount;
2442         long qt = 0L, qs = 0L; int rc = 0;
2443         WorkQueue[] ws; WorkQueue w;
2444         if ((ws = workQueues) !is null) {
2445             for (int i = 0; i < ws.length; ++i) {
2446                 if ((w = ws[i]) !is null) {
2447                     int size = w.queueSize();
2448                     if ((i & 1) == 0)
2449                         qs += size;
2450                     else {
2451                         qt += size;
2452                         st += cast(long)w.nsteals & 0xffffffffL;
2453                         if (w.isApparentlyUnblocked())
2454                             ++rc;
2455                     }
2456                 }
2457             }
2458         }
2459 
2460         int pc = (md & SMASK);
2461         int tc = pc + cast(short)(c >>> TC_SHIFT);
2462         int ac = pc + cast(int)(c >> RC_SHIFT);
2463         if (ac < 0) // ignore negative
2464             ac = 0;
2465         string level = ((md & TERMINATED) != 0 ? "Terminated" :
2466                         (md & STOP)       != 0 ? "Terminating" :
2467                         (md & SHUTDOWN)   != 0 ? "Shutting down" :
2468                         "Running");
2469         return super.toString() ~
2470             "[" ~ level ~
2471             ", parallelism = " ~ pc.to!string() ~
2472             ", size = " ~ tc.to!string() ~
2473             ", active = " ~ ac.to!string() ~
2474             ", running = " ~ rc.to!string() ~
2475             ", steals = " ~ st.to!string() ~
2476             ", tasks = " ~ qt.to!string() ~
2477             ", submissions = " ~ qs.to!string() ~
2478             "]";
2479     }
2480 
2481     /**
2482      * Possibly initiates an orderly shutdown in which previously
2483      * submitted tasks are executed, but no new tasks will be
2484      * accepted. Invocation has no effect on execution state if this
2485      * is the {@link #commonPool()}, and no additional effect if
2486      * already shut down.  Tasks that are in the process of being
2487      * submitted concurrently during the course of this method may or
2488      * may not be rejected.
2489      *
2490      * @throws SecurityException if a security manager exists and
2491      *         the caller is not permitted to modify threads
2492      *         because it does not hold {@link
2493      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2494      */
2495     void shutdown() {
2496         // checkPermission();
2497         tryTerminate(false, true);
2498     }
2499 
2500     /**
2501      * Possibly attempts to cancel and/or stop all tasks, and reject
2502      * all subsequently submitted tasks.  Invocation has no effect on
2503      * execution state if this is the {@link #commonPool()}, and no
2504      * additional effect if already shut down. Otherwise, tasks that
2505      * are in the process of being submitted or executed concurrently
2506      * during the course of this method may or may not be
2507      * rejected. This method cancels both existing and unexecuted
2508      * tasks, in order to permit termination in the presence of task
2509      * dependencies. So the method always returns an empty list
2510      * (unlike the case for some other Executors).
2511      *
2512      * @return an empty list
2513      * @throws SecurityException if a security manager exists and
2514      *         the caller is not permitted to modify threads
2515      *         because it does not hold {@link
2516      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2517      */
2518     List!(Runnable) shutdownNow() {
2519         // checkPermission();
2520         tryTerminate(true, true);
2521         return Collections.emptyList!(Runnable)();
2522     }
2523 
2524     /**
2525      * Returns {@code true} if all tasks have completed following shut down.
2526      *
2527      * @return {@code true} if all tasks have completed following shut down
2528      */
2529     bool isTerminated() {
2530         return (mode & TERMINATED) != 0;
2531     }
2532 
2533     /**
2534      * Returns {@code true} if the process of termination has
2535      * commenced but not yet completed.  This method may be useful for
2536      * debugging. A return of {@code true} reported a sufficient
2537      * period after shutdown may indicate that submitted tasks have
2538      * ignored or suppressed interruption, or are waiting for I/O,
2539      * causing this executor not to properly terminate. (See the
2540      * advisory notes for class {@link ForkJoinTask} stating that
2541      * tasks should not normally entail blocking operations.  But if
2542      * they do, they must abort them on interrupt.)
2543      *
2544      * @return {@code true} if terminating but not yet terminated
2545      */
2546     bool isTerminating() {
2547         int md = mode;
2548         return (md & STOP) != 0 && (md & TERMINATED) == 0;
2549     }
2550 
2551     /**
2552      * Returns {@code true} if this pool has been shut down.
2553      *
2554      * @return {@code true} if this pool has been shut down
2555      */
2556     bool isShutdown() {
2557         return (mode & SHUTDOWN) != 0;
2558     }
2559 
2560     /**
2561      * Blocks until all tasks have completed execution after a
2562      * shutdown request, or the timeout occurs, or the current thread
2563      * is interrupted, whichever happens first. Because the {@link
2564      * #commonPool()} never terminates until program shutdown, when
2565      * applied to the common pool, this method is equivalent to {@link
2566      * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
2567      *
2568      * @param timeout the maximum time to wait
2569      * @param unit the time unit of the timeout argument
2570      * @return {@code true} if this executor terminated and
2571      *         {@code false} if the timeout elapsed before termination
2572      * @throws InterruptedException if interrupted while waiting
2573      */
2574     bool awaitTermination(Duration timeout)
2575         {
2576         if (ThreadEx.interrupted())
2577             throw new InterruptedException();
2578         if (this == common) {
2579             awaitQuiescence(timeout);
2580             return false;
2581         }
2582         long nanos = timeout.total!(TimeUnit.HectoNanosecond);
2583         if (isTerminated())
2584             return true;
2585         if (nanos <= 0L)
2586             return false;
2587         long deadline = Clock.currStdTime + nanos;
2588         synchronized (this) {
2589             for (;;) {
2590                 if (isTerminated())
2591                     return true;
2592                 if (nanos <= 0L)
2593                     return false;
2594                 // long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
2595                 // wait(millis > 0L ? millis : 1L);
2596                 // ThreadEx.currentThread().par
2597                 ThreadEx.sleep(dur!(TimeUnit.HectoNanosecond)(nanos));
2598                 nanos = deadline - Clock.currStdTime;
2599             }
2600         }
2601     }
2602 
2603     /**
2604      * If called by a ForkJoinTask operating in this pool, equivalent
2605      * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
2606      * waits and/or attempts to assist performing tasks until this
2607      * pool {@link #isQuiescent} or the indicated timeout elapses.
2608      *
2609      * @param timeout the maximum time to wait
2610      * @param unit the time unit of the timeout argument
2611      * @return {@code true} if quiescent; {@code false} if the
2612      * timeout elapsed.
2613      */
2614     bool awaitQuiescence(Duration timeout) {
2615         long nanos = timeout.total!(TimeUnit.HectoNanosecond)();
2616         Thread thread = Thread.getThis();
2617         ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)thread;
2618         if (wt !is null && wt.pool is this) {
2619             helpQuiescePool(wt.workQueue);
2620             return true;
2621         }
2622         else {
2623             for (long startTime = Clock.currStdTime;;) {
2624                 IForkJoinTask t;
2625                 if ((t = pollScan(false)) !is null)
2626                     t.doExec();
2627                 else if (isQuiescent())
2628                     return true;
2629                 else if ((Clock.currStdTime - startTime) > nanos)
2630                     return false;
2631                 else
2632                     Thread.yield(); // cannot block
2633             }
2634         }
2635     }
2636 
2637     /**
2638      * Waits and/or attempts to assist performing tasks indefinitely
2639      * until the {@link #commonPool()} {@link #isQuiescent}.
2640      */
2641     static void quiesceCommonPool() {
2642         common.awaitQuiescence(Duration.max);
2643     }
2644 
2645     /**
2646      * Runs the given possibly blocking task.  When {@linkplain
2647      * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this
2648      * method possibly arranges for a spare thread to be activated if
2649      * necessary to ensure sufficient parallelism while the current
2650      * thread is blocked in {@link ManagedBlocker#block blocker.block()}.
2651      *
2652      * <p>This method repeatedly calls {@code blocker.isReleasable()} and
2653      * {@code blocker.block()} until either method returns {@code true}.
2654      * Every call to {@code blocker.block()} is preceded by a call to
2655      * {@code blocker.isReleasable()} that returned {@code false}.
2656      *
2657      * <p>If not running in a ForkJoinPool, this method is
2658      * behaviorally equivalent to
2659      * <pre> {@code
2660      * while (!blocker.isReleasable())
2661      *   if (blocker.block())
2662      *     break;}</pre>
2663      *
2664      * If running in a ForkJoinPool, the pool may first be expanded to
2665      * ensure sufficient parallelism available during the call to
2666      * {@code blocker.block()}.
2667      *
2668      * @param blocker the blocker task
2669      * @throws InterruptedException if {@code blocker.block()} did so
2670      */
2671     static void managedBlock(ManagedBlocker blocker) {
2672         if (blocker is null) throw new NullPointerException();
2673         ForkJoinPool p;
2674         WorkQueue w;
2675         Thread t = Thread.getThis();
2676         ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)t;
2677         if (wt !is null && (p = wt.pool) !is null &&
2678             (w = wt.workQueue) !is null) {
2679             int block;
2680             while (!blocker.isReleasable()) {
2681                 if ((block = p.tryCompensate(w)) != 0) {
2682                     try {
2683                         do {} while (!blocker.isReleasable() &&
2684                                      !blocker.block());
2685                     } finally {
2686                         AtomicHelper.getAndAdd(p.ctl, (block > 0) ? RC_UNIT : 0L);
2687                     }
2688                     break;
2689                 }
2690             }
2691         }
2692         else {
2693             do {} while (!blocker.isReleasable() &&
2694                          !blocker.block());
2695         }
2696     }
2697 
2698     /**
2699      * If the given executor is a ForkJoinPool, poll and execute
2700      * AsynchronousCompletionTasks from worker's queue until none are
2701      * available or blocker is released.
2702      */
2703     static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {
2704         ForkJoinPool p = cast(ForkJoinPool)e;
2705         if (p !is null) {
2706             WorkQueue w; WorkQueue[] ws; int r, n;
2707             
2708             Thread thread = Thread.getThis();
2709             ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)thread; 
2710             if (wt !is null && wt.pool is p)
2711                 w = wt.workQueue;
2712             else if ((r = ThreadLocalRandom.getProbe()) != 0 &&
2713                      (ws = p.workQueues) !is null && (n = cast(int)ws.length) > 0)
2714                 w = ws[(n - 1) & r & SQMASK];
2715             else
2716                 w = null;
2717             if (w !is null)
2718                 w.helpAsyncBlocker(blocker);
2719         }
2720     }
2721 
2722     // AbstractExecutorService overrides.  These rely on undocumented
2723     // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
2724     // implement RunnableFuture.
2725 
2726     protected RunnableFuture!(T) newTaskFor(T)(Runnable runnable, T value) {
2727         return new AdaptedRunnable!(T)(runnable, value);
2728     }
2729 
2730     protected RunnableFuture!(T) newTaskFor(T)(Callable!(T) callable) {
2731         return new AdaptedCallable!(T)(callable);
2732     }
2733 
2734     shared static this() {
2735         int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
2736         try {
2737             ConfigBuilder config = Environment.getProperties();
2738             string p = config.getProperty
2739                 ("hunt.concurrency.ForkJoinPool.common.maximumSpares");
2740             if (!p.empty())
2741                 commonMaxSpares = p.to!int();
2742         } catch (Exception ignore) {
2743             version(HUNT_DEBUG) warning(ignore.toString());
2744         }
2745         COMMON_MAX_SPARES = commonMaxSpares;
2746 
2747         defaultForkJoinWorkerThreadFactory =
2748             new DefaultForkJoinWorkerThreadFactory();
2749         common = new ForkJoinPool(cast(byte)0);
2750         COMMON_PARALLELISM = max(common.mode & SMASK, 1);
2751     }
2752 }
2753 
2754 
2755 /**
2756  * Factory for innocuous worker threads.
2757  */
2758 private final class InnocuousForkJoinWorkerThreadFactory
2759     : ForkJoinWorkerThreadFactory {
2760 
2761     /**
2762      * An ACC to restrict permissions for the factory itself.
2763      * The constructed workers have no permissions set.
2764      */
2765     // private static final AccessControlContext ACC = contextWithPermissions(
2766     //     modifyThreadPermission,
2767     //     new RuntimePermission("enableContextClassLoaderOverride"),
2768     //     new RuntimePermission("modifyThreadGroup"),
2769     //     new RuntimePermission("getClassLoader"),
2770     //     new RuntimePermission("setContextClassLoader"));
2771 
2772     final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
2773         return new InnocuousForkJoinWorkerThread(pool);
2774         // return AccessController.doPrivileged(
2775         //     new PrivilegedAction<>() {
2776         //         ForkJoinWorkerThread run() {
2777         //             return new ForkJoinWorkerThread.
2778         //                 InnocuousForkJoinWorkerThread(pool); }},
2779         //     ACC);
2780     }
2781 }
2782 
2783 
2784 /**
2785  * Factory for creating new {@link ForkJoinWorkerThread}s.
2786  * A {@code ForkJoinWorkerThreadFactory} must be defined and used
2787  * for {@code ForkJoinWorkerThread} subclasses that extend base
2788  * functionality or initialize threads with different contexts.
2789  */
2790 interface ForkJoinWorkerThreadFactory {
2791     /**
2792      * Returns a new worker thread operating in the given pool.
2793      * Returning null or throwing an exception may result in tasks
2794      * never being executed.  If this method throws an exception,
2795      * it is relayed to the caller of the method (for example
2796      * {@code execute}) causing attempted thread creation. If this
2797      * method returns null or throws an exception, it is not
2798      * retried until the next attempted creation (for example
2799      * another call to {@code execute}).
2800      *
2801      * @param pool the pool this thread works in
2802      * @return the new worker thread, or {@code null} if the request
2803      *         to create a thread is rejected
2804      * @throws NullPointerException if the pool is null
2805      */
2806     ForkJoinWorkerThread newThread(ForkJoinPool pool);
2807 }    
2808 
2809 // Nested classes
2810 
2811 // static AccessControlContext contextWithPermissions(Permission ... perms) {
2812 //     Permissions permissions = new Permissions();
2813 //     for (Permission perm : perms)
2814 //         permissions.add(perm);
2815 //     return new AccessControlContext(
2816 //         new ProtectionDomain[] { new ProtectionDomain(null, permissions) });
2817 // }
2818 
2819 /**
2820  * Default ForkJoinWorkerThreadFactory implementation; creates a
2821  * new ForkJoinWorkerThread using the system class loader as the
2822  * thread context class loader.
2823  */
2824 private final class DefaultForkJoinWorkerThreadFactory : ForkJoinWorkerThreadFactory {
2825     // private static final AccessControlContext ACC = contextWithPermissions(
2826     //     new RuntimePermission("getClassLoader"),
2827     //     new RuntimePermission("setContextClassLoader"));
2828 
2829     final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
2830         return new ForkJoinWorkerThread(pool);
2831         // return AccessController.doPrivileged(
2832         //     new PrivilegedAction<>() {
2833         //         ForkJoinWorkerThread run() {
2834         //             return new ForkJoinWorkerThread(
2835         //                 pool, ClassLoader.getSystemClassLoader()); }},
2836         //     ACC);
2837     }
2838 }
2839 
2840 
2841 /**
2842  * Interface for extending managed parallelism for tasks running
2843  * in {@link ForkJoinPool}s.
2844  *
2845  * <p>A {@code ManagedBlocker} provides two methods.  Method
2846  * {@link #isReleasable} must return {@code true} if blocking is
2847  * not necessary. Method {@link #block} blocks the current thread
2848  * if necessary (perhaps internally invoking {@code isReleasable}
2849  * before actually blocking). These actions are performed by any
2850  * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}.
2851  * The unusual methods in this API accommodate synchronizers that
2852  * may, but don't usually, block for long periods. Similarly, they
2853  * allow more efficient internal handling of cases in which
2854  * additional workers may be, but usually are not, needed to
2855  * ensure sufficient parallelism.  Toward this end,
2856  * implementations of method {@code isReleasable} must be amenable
2857  * to repeated invocation.
2858  *
2859  * <p>For example, here is a ManagedBlocker based on a
2860  * ReentrantLock:
2861  * <pre> {@code
2862  * class ManagedLocker implements ManagedBlocker {
2863  *   final ReentrantLock lock;
2864  *   bool hasLock = false;
2865  *   ManagedLocker(ReentrantLock lock) { this.lock = lock; }
2866  *   bool block() {
2867  *     if (!hasLock)
2868  *       lock.lock();
2869  *     return true;
2870  *   }
2871  *   bool isReleasable() {
2872  *     return hasLock || (hasLock = lock.tryLock());
2873  *   }
2874  * }}</pre>
2875  *
2876  * <p>Here is a class that possibly blocks waiting for an
2877  * item on a given queue:
2878  * <pre> {@code
2879  * class QueueTaker!(E) : ManagedBlocker {
2880  *   final BlockingQueue!(E) queue;
2881  *   E item = null;
2882  *   QueueTaker(BlockingQueue!(E) q) { this.queue = q; }
2883  *   bool block() {
2884  *     if (item is null)
2885  *       item = queue.take();
2886  *     return true;
2887  *   }
2888  *   bool isReleasable() {
2889  *     return item !is null || (item = queue.poll()) !is null;
2890  *   }
2891  *   E getItem() { // call after pool.managedBlock completes
2892  *     return item;
2893  *   }
2894  * }}</pre>
2895  */
2896 static interface ManagedBlocker {
2897     /**
2898      * Possibly blocks the current thread, for example waiting for
2899      * a lock or condition.
2900      *
2901      * @return {@code true} if no additional blocking is necessary
2902      * (i.e., if isReleasable would return true)
2903      * @throws InterruptedException if interrupted while waiting
2904      * (the method is not required to do so, but is allowed to)
2905      */
2906     bool block();
2907 
2908     /**
2909      * Returns {@code true} if blocking is unnecessary.
2910      * @return {@code true} if blocking is unnecessary
2911      */
2912     bool isReleasable();
2913 }
2914 
2915 
2916 
2917 /**
2918  * A thread managed by a {@link ForkJoinPool}, which executes
2919  * {@link ForkJoinTask}s.
2920  * This class is subclassable solely for the sake of adding
2921  * functionality -- there are no overridable methods dealing with
2922  * scheduling or execution.  However, you can override initialization
2923  * and termination methods surrounding the main task processing loop.
2924  * If you do create such a subclass, you will also need to supply a
2925  * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to
2926  * {@linkplain ForkJoinPool#ForkJoinPool(int, ForkJoinWorkerThreadFactory,
2927  * UncaughtExceptionHandler, bool, int, int, int, Predicate, long, TimeUnit)
2928  * use it} in a {@code ForkJoinPool}.
2929  *
2930  * @author Doug Lea
2931  */
2932 class ForkJoinWorkerThread : ThreadEx {
2933     /*
2934      * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
2935      * ForkJoinTasks. For explanation, see the internal documentation
2936      * of class ForkJoinPool.
2937      *
2938      * This class just maintains links to its pool and WorkQueue.  The
2939      * pool field is set immediately upon construction, but the
2940      * workQueue field is not set until a call to registerWorker
2941      * completes. This leads to a visibility race, that is tolerated
2942      * by requiring that the workQueue field is only accessed by the
2943      * owning thread.
2944      *
2945      * Support for (non-public) subclass InnocuousForkJoinWorkerThread
2946      * requires that we break quite a lot of encapsulation (via helper
2947      * methods in ThreadLocalRandom) both here and in the subclass to
2948      * access and set Thread fields.
2949      */
2950 
2951     ForkJoinPool pool;                // the pool this thread works in
2952     WorkQueue workQueue;              // work-stealing mechanics
2953 
2954     /**
2955      * Creates a ForkJoinWorkerThread operating in the given pool.
2956      *
2957      * @param pool the pool this thread works in
2958      * @throws NullPointerException if pool is null
2959      */
2960     protected this(ForkJoinPool pool) {
2961         // Use a placeholder until a useful name can be set in registerWorker
2962         super("aForkJoinWorkerThread");
2963         this.pool = pool;
2964         this.workQueue = pool.registerWorker(this);
2965     }
2966 
2967     /**
2968      * Version for use by the default pool.  Supports setting the
2969      * context class loader.  This is a separate constructor to avoid
2970      * affecting the protected constructor.
2971      */
2972     // this(ForkJoinPool pool, ClassLoader ccl) {
2973     //     super("aForkJoinWorkerThread");
2974     //     super.setContextClassLoader(ccl);
2975     //     this.pool = pool;
2976     //     this.workQueue = pool.registerWorker(this);
2977     // }
2978 
2979     /**
2980      * Version for InnocuousForkJoinWorkerThread.
2981      */
2982     this(ForkJoinPool pool,
2983                         //  ClassLoader ccl,
2984                          ThreadGroupEx threadGroup,
2985                         ) { //  AccessControlContext acc
2986         super(threadGroup, null, "aForkJoinWorkerThread");
2987         // super.setContextClassLoader(ccl);
2988         // ThreadLocalRandom.setInheritedAccessControlContext(this, acc);
2989         // ThreadLocalRandom.eraseThreadLocals(this); // clear before registering
2990         this.pool = pool;
2991         this.workQueue = pool.registerWorker(this);
2992     }
2993 
2994     /**
2995      * Returns the pool hosting this thread.
2996      *
2997      * @return the pool
2998      */
2999     ForkJoinPool getPool() {
3000         return pool;
3001     }
3002 
3003     /**
3004      * Returns the unique index number of this thread in its pool.
3005      * The returned value ranges from zero to the maximum number of
3006      * threads (minus one) that may exist in the pool, and does not
3007      * change during the lifetime of the thread.  This method may be
3008      * useful for applications that track status or collect results
3009      * per-worker-thread rather than per-task.
3010      *
3011      * @return the index number
3012      */
3013     int getPoolIndex() {
3014         return workQueue.getPoolIndex();
3015     }
3016 
3017     /**
3018      * Initializes internal state after construction but before
3019      * processing any tasks. If you override this method, you must
3020      * invoke {@code super.onStart()} at the beginning of the method.
3021      * Initialization requires care: Most fields must have legal
3022      * default values, to ensure that attempted accesses from other
3023      * threads work correctly even before this thread starts
3024      * processing tasks.
3025      */
3026     protected void onStart() {
3027     }
3028 
3029     /**
3030      * Performs cleanup associated with termination of this worker
3031      * thread.  If you override this method, you must invoke
3032      * {@code super.onTermination} at the end of the overridden method.
3033      *
3034      * @param exception the exception causing this thread to abort due
3035      * to an unrecoverable error, or {@code null} if completed normally
3036      */
3037     protected void onTermination(Throwable exception) {
3038     }
3039 
3040     /**
3041      * This method is required to be public, but should never be
3042      * called explicitly. It performs the main run loop to execute
3043      * {@link ForkJoinTask}s.
3044      */
3045     override void run() {
3046         if (workQueue.array !is null)
3047             return;
3048         // only run once
3049         Throwable exception = null;
3050 
3051         try {
3052             onStart();
3053             pool.runWorker(workQueue);
3054         } catch (Throwable ex) {
3055             version(HUNT_DEBUG) {
3056                 warning(ex);
3057             } else {
3058                 warning(ex.msg);
3059             }
3060             exception = ex;
3061         }
3062 
3063         try {
3064             onTermination(exception);
3065         } catch(Throwable ex) {
3066             version(HUNT_DEBUG) {
3067                 warning(ex);
3068             } else {
3069                 warning(ex.msg);
3070             }
3071             if (exception is null)
3072                 exception = ex;
3073         }
3074         pool.deregisterWorker(this, exception);  
3075     }
3076 
3077     /**
3078      * Non-hook method for InnocuousForkJoinWorkerThread.
3079      */
3080     void afterTopLevelExec() {
3081     }
3082 
3083 
3084     int awaitJoin(IForkJoinTask task) {
3085         int s = task.getStatus(); 
3086         WorkQueue w = workQueue;
3087             if(w.tryUnpush(task) && (s = task.doExec()) < 0 )
3088                 return s;
3089             else
3090                 return pool.awaitJoin(w, task, MonoTime.zero());
3091     }
3092 
3093     /**
3094      * If the current thread is operating in a ForkJoinPool,
3095      * unschedules and returns, without executing, a task externally
3096      * submitted to the pool, if one is available. Availability may be
3097      * transient, so a {@code null} result does not necessarily imply
3098      * quiescence of the pool.  This method is designed primarily to
3099      * support extensions, and is unlikely to be useful otherwise.
3100      *
3101      * @return a task, or {@code null} if none are available
3102      */
3103     protected static IForkJoinTask pollSubmission() {
3104         ForkJoinWorkerThread t = cast(ForkJoinWorkerThread)Thread.getThis();
3105         return t !is null ? t.pool.pollSubmission() : null;
3106     }
3107 }
3108 
3109 
3110 /**
3111  * A worker thread that has no permissions, is not a member of any
3112  * user-defined ThreadGroupEx, uses the system class loader as
3113  * thread context class loader, and erases all ThreadLocals after
3114  * running each top-level task.
3115  */
3116 final class InnocuousForkJoinWorkerThread : ForkJoinWorkerThread {
3117     /** The ThreadGroupEx for all InnocuousForkJoinWorkerThreads */
3118     private __gshared ThreadGroupEx innocuousThreadGroup;
3119         // AccessController.doPrivileged(new PrivilegedAction<>() {
3120         //     ThreadGroupEx run() {
3121         //         ThreadGroupEx group = Thread.getThis().getThreadGroup();
3122         //         for (ThreadGroupEx p; (p = group.getParent()) !is null; )
3123         //             group = p;
3124         //         return new ThreadGroupEx(
3125         //             group, "InnocuousForkJoinWorkerThreadGroup");
3126         //     }});
3127 
3128     /** An AccessControlContext supporting no privileges */
3129     // private static final AccessControlContext INNOCUOUS_ACC =
3130     //     new AccessControlContext(
3131     //         new ProtectionDomain[] { new ProtectionDomain(null, null) });
3132 
3133     shared static this() {
3134         // ThreadGroupEx group = Thread.getThis().getThreadGroup();
3135         // for (ThreadGroupEx p; (p = group.getParent()) !is null; )
3136         //     group = p;
3137         innocuousThreadGroup = new ThreadGroupEx(
3138                     null, "InnocuousForkJoinWorkerThreadGroup");
3139     }
3140 
3141     this(ForkJoinPool pool) {
3142         super(pool,
3143             //   ClassLoader.getSystemClassLoader(),
3144               innocuousThreadGroup,
3145             //   INNOCUOUS_ACC
3146               );
3147     }
3148 
3149     override // to erase ThreadLocals
3150     void afterTopLevelExec() {
3151         // ThreadLocalRandom.eraseThreadLocals(this);
3152     }
3153 
3154     override // to silently fail
3155     void setUncaughtExceptionHandler(UncaughtExceptionHandler x) { }
3156 
3157     // override // paranoically
3158     // void setContextClassLoader(ClassLoader cl) {
3159     //     throw new SecurityException("setContextClassLoader");
3160     // }
3161 }
3162 
3163 
3164 /**
3165  * Queues supporting work-stealing as well as external task
3166  * submission. See above for descriptions and algorithms.
3167  */
3168 final class WorkQueue {
3169     int source;       // source queue id, or sentinel
3170     int id;                    // pool index, mode, tag
3171     shared int base;                  // index of next slot for poll
3172     shared int top;                   // index of next slot for push
3173     shared int phase;        // versioned, negative: queued, 1: locked
3174     int stackPred;             // pool stack (ctl) predecessor link
3175     int nsteals;               // number of steals
3176     IForkJoinTask[] array;   // the queued tasks; power of 2 size
3177     ForkJoinPool pool;   // the containing pool (may be null)
3178     ForkJoinWorkerThread owner; // owning thread or null if shared
3179 
3180     this(ForkJoinPool pool, ForkJoinWorkerThread owner) {
3181         this.pool = pool;
3182         this.owner = owner;
3183         // Place indices in the center of array (that is not yet allocated)
3184         base = top = INITIAL_QUEUE_CAPACITY >>> 1;
3185     }
3186 
3187     /**
3188      * Tries to lock shared queue by CASing phase field.
3189      */
3190     final bool tryLockPhase() {
3191         return cas(&this.phase, 0, 1);
3192         // return PHASE.compareAndSet(this, 0, 1);
3193     }
3194 
3195     final void releasePhaseLock() {
3196         // PHASE.setRelease(this, 0);
3197         atomicStore(this.phase, 0);
3198     }
3199 
3200     /**
3201      * Returns an exportable index (used by ForkJoinWorkerThread).
3202      */
3203     final int getPoolIndex() {
3204         return (id & 0xffff) >>> 1; // ignore odd/even tag bit
3205     }
3206 
3207     /**
3208      * Returns the approximate number of tasks in the queue.
3209      */
3210     final int queueSize() {
3211         // int n = cast(int)BASE.getAcquire(this) - top;
3212         int n = atomicLoad(this.base) - top;
3213         return (n >= 0) ? 0 : -n; // ignore negative
3214     }
3215 
3216     /**
3217      * Provides a more accurate estimate of whether this queue has
3218      * any tasks than does queueSize, by checking whether a
3219      * near-empty queue has at least one unclaimed task.
3220      */
3221     final bool isEmpty() {
3222         IForkJoinTask[] a; int n, cap, b;
3223         // VarHandle.acquireFence(); // needed by external callers
3224         return ((n = (b = base) - top) >= 0 || // possibly one task
3225                 (n == -1 && ((a = array) is null ||
3226                              (cap = cast(int)a.length) == 0 ||
3227                              a[(cap - 1) & b] is null)));
3228     }
3229 
3230     /**
3231      * Pushes a task. Call only by owner in unshared queues.
3232      *
3233      * @param task the task. Caller must ensure non-null.
3234      * @throws RejectedExecutionException if array cannot be resized
3235      */
3236     final void push(IForkJoinTask task) {
3237         IForkJoinTask[] a;
3238         int s = top, d, cap, m;
3239         ForkJoinPool p = pool;
3240         if ((a = array) !is null && (cap = cast(int)a.length) > 0) {
3241             m = cap - 1;
3242             // FIXME: Needing refactor or cleanup -@zxp at 2019/2/9 8:40:55
3243             // 
3244             // AtomicHelper.store(a[m & s], task);
3245             a[m & s] = task;
3246             // QA.setRelease(a, (m = cap - 1) & s, task);
3247             top = s + 1;
3248             if (((d = s - atomicLoad(this.base)) & ~1) == 0 &&
3249                 p !is null) {                 // size 0 or 1
3250                 // VarHandle.fullFence();
3251                 p.signalWork();
3252             }
3253             else if (d == m)
3254                 growArray(false);
3255         }
3256     }
3257 
3258     /**
3259      * Version of push for shared queues. Call only with phase lock held.
3260      * @return true if should signal work
3261      */
3262     final bool lockedPush(IForkJoinTask task) {
3263         IForkJoinTask[] a;
3264         bool signal = false;
3265         int s = top, b = base, cap, d;
3266         if ((a = array) !is null && (cap = cast(int)a.length) > 0) {
3267             a[(cap - 1) & s] = task;
3268             top = s + 1;
3269             if (b - s + cap - 1 == 0)
3270                 growArray(true);
3271             else {
3272                 phase = 0; // full unlock
3273                 if (((s - base) & ~1) == 0) // size 0 or 1
3274                     signal = true;
3275             }
3276         }
3277         return signal;
3278     }
3279 
3280     /**
3281      * Doubles the capacity of array. Call either by owner or with
3282      * lock held -- it is OK for base, but not top, to move while
3283      * resizings are in progress.
3284      */
3285     final void growArray(bool locked) {
3286         IForkJoinTask[] newA = null;
3287         try {
3288             IForkJoinTask[] oldA; int oldSize, newSize;
3289             if ((oldA = array) !is null && (oldSize = cast(int)oldA.length) > 0 &&
3290                 (newSize = oldSize << 1) <= MAXIMUM_QUEUE_CAPACITY &&
3291                 newSize > 0) {
3292                 try {
3293                     newA = new IForkJoinTask[newSize];
3294                 } catch (OutOfMemoryError ex) {
3295                 }
3296                 if (newA !is null) { // poll from old array, push to new
3297                     int oldMask = oldSize - 1, newMask = newSize - 1;
3298                     for (int s = top - 1, k = oldMask; k >= 0; --k) {
3299                         // IForkJoinTask x = AtomicHelper.getAndSet(oldA[s & oldMask], null);
3300                         // FIXME: Needing refactor or cleanup -@zxp at 2019/2/9 下午8:57:26
3301                         // 
3302                         IForkJoinTask x = oldA[s & oldMask];
3303                         oldA[s & oldMask] = null;
3304 
3305                         if (x !is null)
3306                             newA[s-- & newMask] = x;
3307                         else
3308                             break;
3309                     }
3310                     array = newA;
3311                     // VarHandle.releaseFence();
3312                 }
3313             }
3314         } finally {
3315             if (locked)
3316                 phase = 0;
3317         }
3318         if (newA is null)
3319             throw new RejectedExecutionException("Queue capacity exceeded");
3320     }
3321 
3322     /**
3323      * Takes next task, if one exists, in FIFO order.
3324      */
3325     final IForkJoinTask poll() {
3326         int b, k, cap; 
3327         IForkJoinTask[] a;
3328         while ((a = array) !is null && (cap = cast(int)a.length) > 0 &&
3329                top - (b = base) > 0) {
3330             k = (cap - 1) & b;
3331             // FIXME: Needing refactor or cleanup -@zxp at 2019/2/9 8:42:05
3332             // 
3333             
3334             // IForkJoinTask t = AtomicHelper.load(a[k]);
3335             IForkJoinTask t = a[k];
3336             if (base == b++) {
3337                 if (t is null)
3338                     Thread.yield(); // await index advance
3339                 else if (AtomicHelper.compareAndSet(a[k], t, null)) {
3340                 // else if (QA.compareAndSet(a, k, t, null)) {
3341                     AtomicHelper.store(this.base, b);
3342                     return t;
3343                 }
3344             }
3345         }
3346         return null;
3347     }
3348 
3349     /**
3350      * Takes next task, if one exists, in order specified by mode.
3351      */
3352     final IForkJoinTask nextLocalTask() {
3353         IForkJoinTask t = null;
3354         int md = id, b, s, d, cap; IForkJoinTask[] a;
3355         if ((a = array) !is null && (cap = cast(int)a.length) > 0 &&
3356             (d = (s = top) - (b = base)) > 0) {
3357             if ((md & FIFO) == 0 || d == 1) {
3358                 auto index = (cap - 1) & --s;
3359                 t = AtomicHelper.getAndSet(a[index], cast(IForkJoinTask)null);
3360                 if(t !is null) {
3361                     AtomicHelper.store(this.top, s);
3362                 }
3363             } else {
3364                 auto index = (cap - 1) & b++;
3365                 t = AtomicHelper.getAndSet(a[index], cast(IForkJoinTask)null);
3366                 if (t !is null) {
3367                     AtomicHelper.store(this.base, b);
3368                 }
3369                 else // on contention in FIFO mode, use regular poll
3370                     t = poll();
3371             } 
3372         }
3373         return t;
3374     }
3375 
3376     /**
3377      * Returns next task, if one exists, in order specified by mode.
3378      */
3379     final IForkJoinTask peek() {
3380         int cap; IForkJoinTask[] a;
3381         return ((a = array) !is null && (cap = cast(int)a.length) > 0) ?
3382             a[(cap - 1) & ((id & FIFO) != 0 ? base : top - 1)] : null;
3383     }
3384 
3385     /**
3386      * Pops the given task only if it is at the current top.
3387      */
3388     final bool tryUnpush(IForkJoinTask task) {
3389         bool popped = false;
3390         int s, cap; IForkJoinTask[] a;
3391         if ((a = array) !is null && (cap = cast(int)a.length) > 0 &&
3392             (s = top) != base) {
3393             popped = AtomicHelper.compareAndSet(a[(cap - 1) & --s], task, null);
3394             if(popped) {
3395                 AtomicHelper.store(this.top, s);
3396             }
3397         }
3398         return popped;
3399     }
3400 
3401     /**
3402      * Shared version of tryUnpush.
3403      */
3404     final bool tryLockedUnpush(IForkJoinTask task) {
3405         bool popped = false;
3406         int s = top - 1, k, cap; IForkJoinTask[] a;
3407         if ((a = array) !is null && (cap = cast(int)a.length) > 0 &&
3408             a[k = (cap - 1) & s] == task && tryLockPhase()) {
3409             if (top == s + 1 && array == a) {
3410                 popped = AtomicHelper.compareAndSet(a[k], task, null);
3411                 if(popped) top = s;
3412             }
3413             releasePhaseLock();
3414         }
3415         return popped;
3416     }
3417 
3418     /**
3419      * Removes and cancels all known tasks, ignoring any exceptions.
3420      */
3421     final void cancelAll() {
3422         for (IForkJoinTask t; (t = poll()) !is null; )
3423             IForkJoinTask.cancelIgnoringExceptions(t);
3424     }
3425 
3426     // Specialized execution methods
3427 
3428     /**
3429      * Runs the given (stolen) task if nonnull, as well as
3430      * remaining local tasks and others available from the given
3431      * queue, up to bound n (to avoid infinite unfairness).
3432      */
3433     final void topLevelExec(IForkJoinTask t, WorkQueue q, int n) {
3434         if (t !is null && q !is null) { // hoist checks
3435             int nstolen = 1;
3436             for (;;) {
3437                 t.doExec();
3438                 if (n-- < 0)
3439                     break;
3440                 else if ((t = nextLocalTask()) is null) {
3441                     if ((t = q.poll()) is null)
3442                         break;
3443                     else
3444                         ++nstolen;
3445                 }
3446             }
3447             ForkJoinWorkerThread thread = owner;
3448             nsteals += nstolen;
3449             source = 0;
3450             if (thread !is null)
3451                 thread.afterTopLevelExec();
3452         }
3453     }
3454 
3455     /**
3456      * If present, removes task from queue and executes it.
3457      */
3458     final void tryRemoveAndExec(IForkJoinTask task) {
3459         IForkJoinTask[] a; int s, cap;
3460         if ((a = array) !is null && (cap = cast(int)a.length) > 0 &&
3461             (s = top) - base > 0) { // traverse from top
3462             for (int m = cap - 1, ns = s - 1, i = ns; ; --i) {
3463                 int index = i & m;
3464                 IForkJoinTask t = a[index]; //QA.get(a, index);
3465                 if (t is null)
3466                     break;
3467                 else if (t == task) {
3468                     // if (AtomicHelper.compareAndSet(a[index], t, null)) 
3469                     if(a[index] == t) {
3470                         a[index] = null;
3471                         top = ns;   // safely shift down
3472                         for (int j = i; j != ns; ++j) {
3473                             IForkJoinTask f;
3474                             int pindex = (j + 1) & m;
3475                             f = a[pindex];// QA.get(a, pindex);
3476                             a[pindex] = null;
3477                             // AtomicHelper.store(a[pindex], null);
3478                             // QA.setVolatile(a, pindex, null);
3479                             int jindex = j & m;
3480                             a[jindex] = f;
3481                             // AtomicHelper.store(a[jindex], f);
3482                             // QA.setRelease(a, jindex, f);
3483                         }
3484                         // VarHandle.releaseFence();
3485                         t.doExec();
3486                     }
3487                     break;
3488                 }
3489             }
3490         }
3491     }
3492 
3493     /**
3494      * Tries to pop and run tasks within the target's computation
3495      * until done, not found, or limit exceeded.
3496      *
3497      * @param task root of CountedCompleter computation
3498      * @param limit max runs, or zero for no limit
3499      * @param shared true if must lock to extract task
3500      * @return task status on exit
3501      */
3502     final int helpCC(ICountedCompleter task, int limit, bool isShared) {
3503         int status = 0;
3504         if (task !is null && (status = task.getStatus()) >= 0) {
3505             int s, k, cap; IForkJoinTask[] a;
3506             while ((a = array) !is null && (cap = cast(int)a.length) > 0 &&
3507                    (s = top) - base > 0) {
3508                 ICountedCompleter v = null;
3509                 IForkJoinTask o = a[k = (cap - 1) & (s - 1)];
3510                 ICountedCompleter t = cast(ICountedCompleter)o;
3511                 if (t !is null) {
3512                     for (ICountedCompleter f = t;;) {
3513                         if (f != task) {
3514                             if ((f = f.getCompleter()) is null)
3515                                 break;
3516                         }
3517                         else if (isShared) {
3518                             if (tryLockPhase()) {
3519                                 if (top == s && array == a &&
3520                                     AtomicHelper.compareAndSet(a[k], t, cast(IForkJoinTask)null)) {
3521                                     top = s - 1;
3522                                     v = t;
3523                                 }
3524                                 releasePhaseLock();
3525                             }
3526                             break;
3527                         }
3528                         else {
3529                             if (AtomicHelper.compareAndSet(a[k], t, cast(IForkJoinTask)null)) {
3530                                 top = s - 1;
3531                                 v = t;
3532                             }
3533                             break;
3534                         }
3535                     }
3536                 }
3537                 if (v !is null)
3538                     v.doExec();
3539                 if ((status = task.getStatus()) < 0 || v is null ||
3540                     (limit != 0 && --limit == 0))
3541                     break;
3542             }
3543         }
3544         return status;
3545     }
3546 
3547     /**
3548      * Tries to poll and run AsynchronousCompletionTasks until
3549      * none found or blocker is released
3550      *
3551      * @param blocker the blocker
3552      */
3553     final void helpAsyncBlocker(ManagedBlocker blocker) {
3554         if (blocker !is null) {
3555             int b, k, cap; IForkJoinTask[] a; IForkJoinTask t;
3556             while ((a = array) !is null && (cap = cast(int)a.length) > 0 &&
3557                    top - (b = base) > 0) {
3558                 k = (cap - 1) & b;
3559                 // t = AtomicHelper.load(a[k]);
3560                 t = a[k];
3561                 if (blocker.isReleasable())
3562                     break;
3563                 else if (base == b++ && t !is null) {
3564                     AsynchronousCompletionTask at = cast(AsynchronousCompletionTask)t;
3565                     if (at is null)
3566                         break;
3567                     else if (AtomicHelper.compareAndSet(a[k], t, null)) {
3568                         AtomicHelper.store(this.base, b);
3569                         t.doExec();
3570                     }
3571                 }
3572             }
3573         }
3574     }
3575 
3576     /**
3577      * Returns true if owned and not known to be blocked.
3578      */
3579     final bool isApparentlyUnblocked() {
3580         ThreadEx wt; ThreadState s;
3581         return ((wt = owner) !is null &&
3582                 (s = wt.getState()) != ThreadState.BLOCKED &&
3583                 s != ThreadState.WAITING &&
3584                 s != ThreadState.TIMED_WAITING);
3585     }
3586 
3587     // VarHandle mechanics.
3588     // static final VarHandle PHASE;
3589     // static final VarHandle BASE;
3590     // static final VarHandle TOP;
3591     // static {
3592     //     try {
3593     //         MethodHandles.Lookup l = MethodHandles.lookup();
3594     //         PHASE = l.findVarHandle(WorkQueue.class, "phase", int.class);
3595     //         BASE = l.findVarHandle(WorkQueue.class, "base", int.class);
3596     //         TOP = l.findVarHandle(WorkQueue.class, "top", int.class);
3597     //     } catch (ReflectiveOperationException e) {
3598     //         throw new ExceptionInInitializerError(e);
3599     //     }
3600     // }
3601 }
3602 
3603 
3604 
3605 /**
3606  * A marker interface identifying asynchronous tasks produced by
3607  * {@code async} methods. This may be useful for monitoring,
3608  * debugging, and tracking asynchronous activities.
3609  *
3610  */
3611 interface AsynchronousCompletionTask {
3612 }