1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 module hunt.concurrency.ForkJoinPool; 37 38 import hunt.concurrency.AbstractExecutorService; 39 import hunt.concurrency.atomic.AtomicHelper; 40 import hunt.concurrency.Exceptions; 41 import hunt.concurrency.ForkJoinTask; 42 import hunt.concurrency.ForkJoinTaskHelper; 43 import hunt.concurrency.ThreadLocalRandom; 44 import hunt.concurrency.thread; 45 46 import hunt.collection.List; 47 import hunt.collection.Collection; 48 import hunt.collection.Collections; 49 import hunt.logging.ConsoleLogger; 50 import hunt.Exceptions; 51 import hunt.Functions; 52 import hunt.system.Environment; 53 import hunt.system.Memory; 54 import hunt.util.Common; 55 import hunt.util.Configuration; 56 import hunt.util.DateTime; 57 import hunt.util.Runnable; 58 59 import core.atomic; 60 import core.sync.mutex; 61 import core.thread; 62 import core.time; 63 64 import std.algorithm; 65 import std.array; 66 import std.conv; 67 import std.datetime; 68 69 alias ReentrantLock = Mutex; 70 71 // import java.lang.Thread.UncaughtExceptionHandler; 72 // import java.lang.invoke.MethodHandles; 73 // import java.lang.invoke.VarHandle; 74 // import java.security.AccessController; 75 // import java.security.AccessControlContext; 76 // import java.security.Permission; 77 // import java.security.Permissions; 78 // import java.security.PrivilegedAction; 79 // import java.security.ProtectionDomain; 80 // import hunt.collection.ArrayList; 81 // import hunt.collection.Collection; 82 // import java.util.Collections; 83 // import java.util.List; 84 // import hunt.util.functional.Predicate; 85 // import hunt.concurrency.locks.LockSupport; 86 87 88 private { 89 90 // Constants shared across ForkJoinPool and WorkQueue 91 92 // Bounds 93 enum int SWIDTH = 16; // width of short 94 enum int SMASK = 0xffff; // short bits == max index 95 enum int MAX_CAP = 0x7fff; // max #workers - 1 96 enum int SQMASK = 0x007e; // max 64 (even) slots 97 98 // Masks and units for WorkQueue.phase and ctl sp subfield 99 enum int UNSIGNALLED = 1 << 31; // must be negative 100 enum int SS_SEQ = 1 << 16; // version count 101 enum int QLOCK = 1; // must be 1 102 103 // Mode bits and sentinels, some also used in WorkQueue id and.source fields 104 enum int OWNED = 1; // queue has owner thread 105 enum int FIFO = 1 << 16; // fifo queue or access mode 106 enum int SHUTDOWN = 1 << 18; 107 enum int TERMINATED = 1 << 19; 108 enum int STOP = 1 << 31; // must be negative 109 enum int QUIET = 1 << 30; // not scanning or working 110 enum int DORMANT = QUIET | UNSIGNALLED; 111 112 /** 113 * The maximum number of top-level polls per worker before 114 * checking other queues, expressed as a bit shift to, in effect, 115 * multiply by pool size, and then use as random value mask, so 116 * average bound is about poolSize*(1<<TOP_BOUND_SHIFT). See 117 * above for rationale. 118 */ 119 enum int TOP_BOUND_SHIFT = 10; 120 121 /** 122 * Initial capacity of work-stealing queue array. 123 * Must be a power of two, at least 2. 124 */ 125 enum int INITIAL_QUEUE_CAPACITY = 1 << 13; 126 127 /** 128 * Maximum capacity for queue arrays. Must be a power of two less 129 * than or equal to 1 << (31 - width of array entry) to ensure 130 * lack of wraparound of index calculations, but defined to a 131 * value a bit less than this to help users trap runaway programs 132 * before saturating systems. 133 */ 134 enum int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M 135 136 } 137 138 /** 139 * An {@link ExecutorService} for running {@link ForkJoinTask}s. 140 * A {@code ForkJoinPool} provides the entry point for submissions 141 * from non-{@code ForkJoinTask} clients, as well as management and 142 * monitoring operations. 143 * 144 * <p>A {@code ForkJoinPool} differs from other kinds of {@link 145 * ExecutorService} mainly by virtue of employing 146 * <em>work-stealing</em>: all threads in the pool attempt to find and 147 * execute tasks submitted to the pool and/or created by other active 148 * tasks (eventually blocking waiting for work if none exist). This 149 * enables efficient processing when most tasks spawn other subtasks 150 * (as do most {@code ForkJoinTask}s), as well as when many small 151 * tasks are submitted to the pool from external clients. Especially 152 * when setting <em>asyncMode</em> to true in constructors, {@code 153 * ForkJoinPool}s may also be appropriate for use with event-style 154 * tasks that are never joined. All worker threads are initialized 155 * with {@link Thread#isDaemon} set {@code true}. 156 * 157 * <p>A static {@link #commonPool()} is available and appropriate for 158 * most applications. The common pool is used by any ForkJoinTask that 159 * is not explicitly submitted to a specified pool. Using the common 160 * pool normally reduces resource usage (its threads are slowly 161 * reclaimed during periods of non-use, and reinstated upon subsequent 162 * use). 163 * 164 * <p>For applications that require separate or custom pools, a {@code 165 * ForkJoinPool} may be constructed with a given target parallelism 166 * level; by default, equal to the number of available processors. 167 * The pool attempts to maintain enough active (or available) threads 168 * by dynamically adding, suspending, or resuming internal worker 169 * threads, even if some tasks are stalled waiting to join others. 170 * However, no such adjustments are guaranteed in the face of blocked 171 * I/O or other unmanaged synchronization. The nested {@link 172 * ManagedBlocker} interface enables extension of the kinds of 173 * synchronization accommodated. The default policies may be 174 * overridden using a constructor with parameters corresponding to 175 * those documented in class {@link ThreadPoolExecutor}. 176 * 177 * <p>In addition to execution and lifecycle control methods, this 178 * class provides status check methods (for example 179 * {@link #getStealCount}) that are intended to aid in developing, 180 * tuning, and monitoring fork/join applications. Also, method 181 * {@link #toString} returns indications of pool state in a 182 * convenient form for informal monitoring. 183 * 184 * <p>As is the case with other ExecutorServices, there are three 185 * main task execution methods summarized in the following table. 186 * These are designed to be used primarily by clients not already 187 * engaged in fork/join computations in the current pool. The main 188 * forms of these methods accept instances of {@code ForkJoinTask}, 189 * but overloaded forms also allow mixed execution of plain {@code 190 * Runnable}- or {@code Callable}- based activities as well. However, 191 * tasks that are already executing in a pool should normally instead 192 * use the within-computation forms listed in the table unless using 193 * async event-style tasks that are not usually joined, in which case 194 * there is little difference among choice of methods. 195 * 196 * <table class="plain"> 197 * <caption>Summary of task execution methods</caption> 198 * <tr> 199 * <td></td> 200 * <th scope="col"> Call from non-fork/join clients</th> 201 * <th scope="col"> Call from within fork/join computations</th> 202 * </tr> 203 * <tr> 204 * <th scope="row" style="text-align:left"> Arrange async execution</th> 205 * <td> {@link #execute(ForkJoinTask)}</td> 206 * <td> {@link ForkJoinTask#fork}</td> 207 * </tr> 208 * <tr> 209 * <th scope="row" style="text-align:left"> Await and obtain result</th> 210 * <td> {@link #invoke(ForkJoinTask)}</td> 211 * <td> {@link ForkJoinTask#invoke}</td> 212 * </tr> 213 * <tr> 214 * <th scope="row" style="text-align:left"> Arrange exec and obtain Future</th> 215 * <td> {@link #submit(ForkJoinTask)}</td> 216 * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td> 217 * </tr> 218 * </table> 219 * 220 * <p>The parameters used to construct the common pool may be controlled by 221 * setting the following {@linkplain System#getProperty system properties}: 222 * <ul> 223 * <li>{@code hunt.concurrency.ForkJoinPool.common.parallelism} 224 * - the parallelism level, a non-negative integer 225 * <li>{@code hunt.concurrency.ForkJoinPool.common.threadFactory} 226 * - the class name of a {@link ForkJoinWorkerThreadFactory}. 227 * The {@linkplain ClassLoader#getSystemClassLoader() system class loader} 228 * is used to load this class. 229 * <li>{@code hunt.concurrency.ForkJoinPool.common.exceptionHandler} 230 * - the class name of a {@link UncaughtExceptionHandler}. 231 * The {@linkplain ClassLoader#getSystemClassLoader() system class loader} 232 * is used to load this class. 233 * <li>{@code hunt.concurrency.ForkJoinPool.common.maximumSpares} 234 * - the maximum number of allowed extra threads to maintain target 235 * parallelism (default 256). 236 * </ul> 237 * If no thread factory is supplied via a system property, then the 238 * common pool uses a factory that uses the system class loader as the 239 * {@linkplain Thread#getContextClassLoader() thread context class loader}. 240 * In addition, if a {@link SecurityManager} is present, then 241 * the common pool uses a factory supplying threads that have no 242 * {@link Permissions} enabled. 243 * 244 * Upon any error in establishing these settings, default parameters 245 * are used. It is possible to disable or limit the use of threads in 246 * the common pool by setting the parallelism property to zero, and/or 247 * using a factory that may return {@code null}. However doing so may 248 * cause unjoined tasks to never be executed. 249 * 250 * <p><b>Implementation notes</b>: This implementation restricts the 251 * maximum number of running threads to 32767. Attempts to create 252 * pools with greater than the maximum number result in 253 * {@code IllegalArgumentException}. 254 * 255 * <p>This implementation rejects submitted tasks (that is, by throwing 256 * {@link RejectedExecutionException}) only when the pool is shut down 257 * or internal resources have been exhausted. 258 * 259 * @author Doug Lea 260 */ 261 class ForkJoinPool : AbstractExecutorService { 262 263 /* 264 * Implementation Overview 265 * 266 * This class and its nested classes provide the main 267 * functionality and control for a set of worker threads: 268 * Submissions from non-FJ threads enter into submission queues. 269 * Workers take these tasks and typically split them into subtasks 270 * that may be stolen by other workers. Work-stealing based on 271 * randomized scans generally leads to better throughput than 272 * "work dealing" in which producers assign tasks to idle threads, 273 * in part because threads that have finished other tasks before 274 * the signalled thread wakes up (which can be a long time) can 275 * take the task instead. Preference rules give first priority to 276 * processing tasks from their own queues (LIFO or FIFO, depending 277 * on mode), then to randomized FIFO steals of tasks in other 278 * queues. This framework began as vehicle for supporting 279 * tree-structured parallelism using work-stealing. Over time, 280 * its scalability advantages led to extensions and changes to 281 * better support more diverse usage contexts. Because most 282 * internal methods and nested classes are interrelated, their 283 * main rationale and descriptions are presented here; individual 284 * methods and nested classes contain only brief comments about 285 * details. 286 * 287 * WorkQueues 288 * ========== 289 * 290 * Most operations occur within work-stealing queues (in nested 291 * class WorkQueue). These are special forms of Deques that 292 * support only three of the four possible end-operations -- push, 293 * pop, and poll (aka steal), under the further constraints that 294 * push and pop are called only from the owning thread (or, as 295 * extended here, under a lock), while poll may be called from 296 * other threads. (If you are unfamiliar with them, you probably 297 * want to read Herlihy and Shavit's book "The Art of 298 * Multiprocessor programming", chapter 16 describing these in 299 * more detail before proceeding.) The main work-stealing queue 300 * design is roughly similar to those in the papers "Dynamic 301 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005 302 * (http://research.sun.com/scalable/pubs/index.html) and 303 * "Idempotent work stealing" by Michael, Saraswat, and Vechev, 304 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186). 305 * The main differences ultimately stem from GC requirements that 306 * we null out taken slots as soon as we can, to maintain as small 307 * a footprint as possible even in programs generating huge 308 * numbers of tasks. To accomplish this, we shift the CAS 309 * arbitrating pop vs poll (steal) from being on the indices 310 * ("base" and "top") to the slots themselves. 311 * 312 * Adding tasks then takes the form of a classic array push(task) 313 * in a circular buffer: 314 * q.array[q.top++ % length] = task; 315 * 316 * (The actual code needs to null-check and size-check the array, 317 * uses masking, not mod, for indexing a power-of-two-sized array, 318 * adds a release fence for publication, and possibly signals 319 * waiting workers to start scanning -- see below.) Both a 320 * successful pop and poll mainly entail a CAS of a slot from 321 * non-null to null. 322 * 323 * The pop operation (always performed by owner) is: 324 * if ((the task at top slot is not null) and 325 * (CAS slot to null)) 326 * decrement top and return task; 327 * 328 * And the poll operation (usually by a stealer) is 329 * if ((the task at base slot is not null) and 330 * (CAS slot to null)) 331 * increment base and return task; 332 * 333 * There are several variants of each of these. Most uses occur 334 * within operations that also interleave contention or emptiness 335 * tracking or inspection of elements before extracting them, so 336 * must interleave these with the above code. When performed by 337 * owner, getAndSet is used instead of CAS (see for example method 338 * nextLocalTask) which is usually more efficient, and possible 339 * because the top index cannot independently change during the 340 * operation. 341 * 342 * Memory ordering. See "Correct and Efficient Work-Stealing for 343 * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013 344 * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an 345 * analysis of memory ordering requirements in work-stealing 346 * algorithms similar to (but different than) the one used here. 347 * Extracting tasks in array slots via (fully fenced) CAS provides 348 * primary synchronization. The base and top indices imprecisely 349 * guide where to extract from. We do not usually require strict 350 * orderings of array and index updates. Many index accesses use 351 * plain mode, with ordering constrained by surrounding context 352 * (usually with respect to element CASes or the two WorkQueue 353 * fields source and phase). When not otherwise already 354 * constrained, reads of "base" by queue owners use acquire-mode, 355 * and some externally callable methods preface accesses with 356 * acquire fences. Additionally, to ensure that index update 357 * writes are not coalesced or postponed in loops etc, "opaque" 358 * mode is used in a few cases where timely writes are not 359 * otherwise ensured. The "locked" versions of push- and pop- 360 * based methods for shared queues differ from owned versions 361 * because locking already forces some of the ordering. 362 * 363 * Because indices and slot contents cannot always be consistent, 364 * a check that base == top indicates (momentary) emptiness, but 365 * otherwise may err on the side of possibly making the queue 366 * appear nonempty when a push, pop, or poll have not fully 367 * committed, or making it appear empty when an update of top has 368 * not yet been visibly written. (Method isEmpty() checks the 369 * case of a partially completed removal of the last element.) 370 * Because of this, the poll operation, considered individually, 371 * is not wait-free. One thief cannot successfully continue until 372 * another in-progress one (or, if previously empty, a push) 373 * visibly completes. This can stall threads when required to 374 * consume from a given queue (see method poll()). However, in 375 * the aggregate, we ensure at least probabilistic 376 * non-blockingness. If an attempted steal fails, a scanning 377 * thief chooses a different random victim target to try next. So, 378 * in order for one thief to progress, it suffices for any 379 * in-progress poll or new push on any empty queue to complete. 380 * 381 * This approach also enables support of a user mode in which 382 * local task processing is in FIFO, not LIFO order, simply by 383 * using poll rather than pop. This can be useful in 384 * message-passing frameworks in which tasks are never joined. 385 * 386 * WorkQueues are also used in a similar way for tasks submitted 387 * to the pool. We cannot mix these tasks in the same queues used 388 * by workers. Instead, we randomly associate submission queues 389 * with submitting threads, using a form of hashing. The 390 * ThreadLocalRandom probe value serves as a hash code for 391 * choosing existing queues, and may be randomly repositioned upon 392 * contention with other submitters. In essence, submitters act 393 * like workers except that they are restricted to executing local 394 * tasks that they submitted. Insertion of tasks in shared mode 395 * requires a lock but we use only a simple spinlock (using field 396 * phase), because submitters encountering a busy queue move to a 397 * different position to use or create other queues -- they block 398 * only when creating and registering new queues. Because it is 399 * used only as a spinlock, unlocking requires only a "releasing" 400 * store (using setRelease) unless otherwise signalling. 401 * 402 * Management 403 * ========== 404 * 405 * The main throughput advantages of work-stealing stem from 406 * decentralized control -- workers mostly take tasks from 407 * themselves or each other, at rates that can exceed a billion 408 * per second. The pool itself creates, activates (enables 409 * scanning for and running tasks), deactivates, blocks, and 410 * terminates threads, all with minimal central information. 411 * There are only a few properties that we can globally track or 412 * maintain, so we pack them into a small number of variables, 413 * often maintaining atomicity without blocking or locking. 414 * Nearly all essentially atomic control state is held in a few 415 * variables that are by far most often read (not 416 * written) as status and consistency checks. We pack as much 417 * information into them as we can. 418 * 419 * Field "ctl" contains 64 bits holding information needed to 420 * atomically decide to add, enqueue (on an event queue), and 421 * dequeue and release workers. To enable this packing, we 422 * restrict maximum parallelism to (1<<15)-1 (which is far in 423 * excess of normal operating range) to allow ids, counts, and 424 * their negations (used for thresholding) to fit into 16bit 425 * subfields. 426 * 427 * Field "mode" holds configuration parameters as well as lifetime 428 * status, atomically and monotonically setting SHUTDOWN, STOP, 429 * and finally TERMINATED bits. 430 * 431 * Field "workQueues" holds references to WorkQueues. It is 432 * updated (only during worker creation and termination) under 433 * lock (using field workerNamePrefix as lock), but is otherwise 434 * concurrently readable, and accessed directly. We also ensure 435 * that uses of the array reference itself never become too stale 436 * in case of resizing, by arranging that (re-)reads are separated 437 * by at least one acquiring read access. To simplify index-based 438 * operations, the array size is always a power of two, and all 439 * readers must tolerate null slots. Worker queues are at odd 440 * indices. Shared (submission) queues are at even indices, up to 441 * a maximum of 64 slots, to limit growth even if the array needs 442 * to expand to add more workers. Grouping them together in this 443 * way simplifies and speeds up task scanning. 444 * 445 * All worker thread creation is on-demand, triggered by task 446 * submissions, replacement of terminated workers, and/or 447 * compensation for blocked workers. However, all other support 448 * code is set up to work with other policies. To ensure that we 449 * do not hold on to worker references that would prevent GC, all 450 * accesses to workQueues are via indices into the workQueues 451 * array (which is one source of some of the messy code 452 * constructions here). In essence, the workQueues array serves as 453 * a weak reference mechanism. Thus for example the stack top 454 * subfield of ctl stores indices, not references. 455 * 456 * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we 457 * cannot let workers spin indefinitely scanning for tasks when 458 * none can be found immediately, and we cannot start/resume 459 * workers unless there appear to be tasks available. On the 460 * other hand, we must quickly prod them into action when new 461 * tasks are submitted or generated. In many usages, ramp-up time 462 * is the main limiting factor in overall performance, which is 463 * compounded at program start-up by JIT compilation and 464 * allocation. So we streamline this as much as possible. 465 * 466 * The "ctl" field atomically maintains total worker and 467 * "released" worker counts, plus the head of the available worker 468 * queue (actually stack, represented by the lower 32bit subfield 469 * of ctl). Released workers are those known to be scanning for 470 * and/or running tasks. Unreleased ("available") workers are 471 * recorded in the ctl stack. These workers are made available for 472 * signalling by enqueuing in ctl (see method runWorker). The 473 * "queue" is a form of Treiber stack. This is ideal for 474 * activating threads in most-recently used order, and improves 475 * performance and locality, outweighing the disadvantages of 476 * being prone to contention and inability to release a worker 477 * unless it is topmost on stack. To avoid missed signal problems 478 * inherent in any wait/signal design, available workers rescan 479 * for (and if found run) tasks after enqueuing. Normally their 480 * release status will be updated while doing so, but the released 481 * worker ctl count may underestimate the number of active 482 * threads. (However, it is still possible to determine quiescence 483 * via a validation traversal -- see isQuiescent). After an 484 * unsuccessful rescan, available workers are blocked until 485 * signalled (see signalWork). The top stack state holds the 486 * value of the "phase" field of the worker: its index and status, 487 * plus a version counter that, in addition to the count subfields 488 * (also serving as version stamps) provide protection against 489 * Treiber stack ABA effects. 490 * 491 * Creating workers. To create a worker, we pre-increment counts 492 * (serving as a reservation), and attempt to construct a 493 * ForkJoinWorkerThread via its factory. Upon construction, the 494 * new thread invokes registerWorker, where it constructs a 495 * WorkQueue and is assigned an index in the workQueues array 496 * (expanding the array if necessary). The thread is then started. 497 * Upon any exception across these steps, or null return from 498 * factory, deregisterWorker adjusts counts and records 499 * accordingly. If a null return, the pool continues running with 500 * fewer than the target number workers. If exceptional, the 501 * exception is propagated, generally to some external caller. 502 * Worker index assignment avoids the bias in scanning that would 503 * occur if entries were sequentially packed starting at the front 504 * of the workQueues array. We treat the array as a simple 505 * power-of-two hash table, expanding as needed. The seedIndex 506 * increment ensures no collisions until a resize is needed or a 507 * worker is deregistered and replaced, and thereafter keeps 508 * probability of collision low. We cannot use 509 * ThreadLocalRandom.getProbe() for similar purposes here because 510 * the thread has not started yet, but do so for creating 511 * submission queues for existing external threads (see 512 * externalPush). 513 * 514 * WorkQueue field "phase" is used by both workers and the pool to 515 * manage and track whether a worker is UNSIGNALLED (possibly 516 * blocked waiting for a signal). When a worker is enqueued its 517 * phase field is set. Note that phase field updates lag queue CAS 518 * releases so usage requires care -- seeing a negative phase does 519 * not guarantee that the worker is available. When queued, the 520 * lower 16 bits of scanState must hold its pool index. So we 521 * place the index there upon initialization and otherwise keep it 522 * there or restore it when necessary. 523 * 524 * The ctl field also serves as the basis for memory 525 * synchronization surrounding activation. This uses a more 526 * efficient version of a Dekker-like rule that task producers and 527 * consumers sync with each other by both writing/CASing ctl (even 528 * if to its current value). This would be extremely costly. So 529 * we relax it in several ways: (1) Producers only signal when 530 * their queue is possibly empty at some point during a push 531 * operation (which requires conservatively checking size zero or 532 * one to cover races). (2) Other workers propagate this signal 533 * when they find tasks in a queue with size greater than one. (3) 534 * Workers only enqueue after scanning (see below) and not finding 535 * any tasks. (4) Rather than CASing ctl to its current value in 536 * the common case where no action is required, we reduce write 537 * contention by equivalently prefacing signalWork when called by 538 * an external task producer using a memory access with 539 * full-semantics or a "fullFence". 540 * 541 * Almost always, too many signals are issued, in part because a 542 * task producer cannot tell if some existing worker is in the 543 * midst of finishing one task (or already scanning) and ready to 544 * take another without being signalled. So the producer might 545 * instead activate a different worker that does not find any 546 * work, and then inactivates. This scarcely matters in 547 * steady-state computations involving all workers, but can create 548 * contention and bookkeeping bottlenecks during ramp-up, 549 * ramp-down, and small computations involving only a few workers. 550 * 551 * Scanning. Method scan (from runWorker) performs top-level 552 * scanning for tasks. (Similar scans appear in helpQuiesce and 553 * pollScan.) Each scan traverses and tries to poll from each 554 * queue starting at a random index. Scans are not performed in 555 * ideal random permutation order, to reduce cacheline 556 * contention. The pseudorandom generator need not have 557 * high-quality statistical properties in the long term, but just 558 * within computations; We use Marsaglia XorShifts (often via 559 * ThreadLocalRandom.nextSecondarySeed), which are cheap and 560 * suffice. Scanning also includes contention reduction: When 561 * scanning workers fail to extract an apparently existing task, 562 * they soon restart at a different pseudorandom index. This form 563 * of backoff improves throughput when many threads are trying to 564 * take tasks from few queues, which can be common in some usages. 565 * Scans do not otherwise explicitly take into account core 566 * affinities, loads, cache localities, etc, However, they do 567 * exploit temporal locality (which usually approximates these) by 568 * preferring to re-poll from the same queue after a successful 569 * poll before trying others (see method topLevelExec). However 570 * this preference is bounded (see TOP_BOUND_SHIFT) as a safeguard 571 * against infinitely unfair looping under unbounded user task 572 * recursion, and also to reduce long-term contention when many 573 * threads poll few queues holding many small tasks. The bound is 574 * high enough to avoid much impact on locality and scheduling 575 * overhead. 576 * 577 * Trimming workers. To release resources after periods of lack of 578 * use, a worker starting to wait when the pool is quiescent will 579 * time out and terminate (see method runWorker) if the pool has 580 * remained quiescent for period given by field keepAlive. 581 * 582 * Shutdown and Termination. A call to shutdownNow invokes 583 * tryTerminate to atomically set a runState bit. The calling 584 * thread, as well as every other worker thereafter terminating, 585 * helps terminate others by cancelling their unprocessed tasks, 586 * and waking them up, doing so repeatedly until stable. Calls to 587 * non-abrupt shutdown() preface this by checking whether 588 * termination should commence by sweeping through queues (until 589 * stable) to ensure lack of in-flight submissions and workers 590 * about to process them before triggering the "STOP" phase of 591 * termination. 592 * 593 * Joining Tasks 594 * ============= 595 * 596 * Any of several actions may be taken when one worker is waiting 597 * to join a task stolen (or always held) by another. Because we 598 * are multiplexing many tasks on to a pool of workers, we can't 599 * always just let them block (as in Thread.join). We also cannot 600 * just reassign the joiner's run-time stack with another and 601 * replace it later, which would be a form of "continuation", that 602 * even if possible is not necessarily a good idea since we may 603 * need both an unblocked task and its continuation to progress. 604 * Instead we combine two tactics: 605 * 606 * Helping: Arranging for the joiner to execute some task that it 607 * would be running if the steal had not occurred. 608 * 609 * Compensating: Unless there are already enough live threads, 610 * method tryCompensate() may create or re-activate a spare 611 * thread to compensate for blocked joiners until they unblock. 612 * 613 * A third form (implemented in tryRemoveAndExec) amounts to 614 * helping a hypothetical compensator: If we can readily tell that 615 * a possible action of a compensator is to steal and execute the 616 * task being joined, the joining thread can do so directly, 617 * without the need for a compensation thread. 618 * 619 * The ManagedBlocker extension API can't use helping so relies 620 * only on compensation in method awaitBlocker. 621 * 622 * The algorithm in awaitJoin entails a form of "linear helping". 623 * Each worker records (in field source) the id of the queue from 624 * which it last stole a task. The scan in method awaitJoin uses 625 * these markers to try to find a worker to help (i.e., steal back 626 * a task from and execute it) that could hasten completion of the 627 * actively joined task. Thus, the joiner executes a task that 628 * would be on its own local deque if the to-be-joined task had 629 * not been stolen. This is a conservative variant of the approach 630 * described in Wagner & Calder "Leapfrogging: a portable 631 * technique for implementing efficient futures" SIGPLAN Notices, 632 * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs 633 * mainly in that we only record queue ids, not full dependency 634 * links. This requires a linear scan of the workQueues array to 635 * locate stealers, but isolates cost to when it is needed, rather 636 * than adding to per-task overhead. Searches can fail to locate 637 * stealers GC stalls and the like delay recording sources. 638 * Further, even when accurately identified, stealers might not 639 * ever produce a task that the joiner can in turn help with. So, 640 * compensation is tried upon failure to find tasks to run. 641 * 642 * Compensation does not by default aim to keep exactly the target 643 * parallelism number of unblocked threads running at any given 644 * time. Some previous versions of this class employed immediate 645 * compensations for any blocked join. However, in practice, the 646 * vast majority of blockages are byproducts of GC and 647 * other JVM or OS activities that are made worse by replacement 648 * when they cause longer-term oversubscription. Rather than 649 * impose arbitrary policies, we allow users to override the 650 * default of only adding threads upon apparent starvation. The 651 * compensation mechanism may also be bounded. Bounds for the 652 * commonPool (see COMMON_MAX_SPARES) better enable JVMs to cope 653 * with programming errors and abuse before running out of 654 * resources to do so. 655 * 656 * Common Pool 657 * =========== 658 * 659 * The static common pool always exists after static 660 * initialization. Since it (or any other created pool) need 661 * never be used, we minimize initial construction overhead and 662 * footprint to the setup of about a dozen fields. 663 * 664 * When external threads submit to the common pool, they can 665 * perform subtask processing (see externalHelpComplete and 666 * related methods) upon joins. This caller-helps policy makes it 667 * sensible to set common pool parallelism level to one (or more) 668 * less than the total number of available cores, or even zero for 669 * pure caller-runs. We do not need to record whether external 670 * submissions are to the common pool -- if not, external help 671 * methods return quickly. These submitters would otherwise be 672 * blocked waiting for completion, so the extra effort (with 673 * liberally sprinkled task status checks) in inapplicable cases 674 * amounts to an odd form of limited spin-wait before blocking in 675 * ForkJoinTask.join. 676 * 677 * As a more appropriate default in managed environments, unless 678 * overridden by system properties, we use workers of subclass 679 * InnocuousForkJoinWorkerThread when there is a SecurityManager 680 * present. These workers have no permissions set, do not belong 681 * to any user-defined ThreadGroupEx, and erase all ThreadLocals 682 * after executing any top-level task (see 683 * WorkQueue.afterTopLevelExec). The associated mechanics (mainly 684 * in ForkJoinWorkerThread) may be JVM-dependent and must access 685 * particular Thread class fields to achieve this effect. 686 * 687 * Memory placement 688 * ================ 689 * 690 * Performance can be very sensitive to placement of instances of 691 * ForkJoinPool and WorkQueues and their queue arrays. To reduce 692 * false-sharing impact, the @Contended annotation isolates 693 * adjacent WorkQueue instances, as well as the ForkJoinPool.ctl 694 * field. WorkQueue arrays are allocated (by their threads) with 695 * larger initial sizes than most ever need, mostly to reduce 696 * false sharing with current garbage collectors that use cardmark 697 * tables. 698 * 699 * Style notes 700 * =========== 701 * 702 * Memory ordering relies mainly on VarHandles. This can be 703 * awkward and ugly, but also reflects the need to control 704 * outcomes across the unusual cases that arise in very racy code 705 * with very few invariants. All fields are read into locals 706 * before use, and null-checked if they are references. Array 707 * accesses using masked indices include checks (that are always 708 * true) that the array length is non-zero to avoid compilers 709 * inserting more expensive traps. This is usually done in a 710 * "C"-like style of listing declarations at the heads of methods 711 * or blocks, and using inline assignments on first encounter. 712 * Nearly all explicit checks lead to bypass/return, not exception 713 * throws, because they may legitimately arise due to 714 * cancellation/revocation during shutdown. 715 * 716 * There is a lot of representation-level coupling among classes 717 * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The 718 * fields of WorkQueue maintain data structures managed by 719 * ForkJoinPool, so are directly accessed. There is little point 720 * trying to reduce this, since any associated future changes in 721 * representations will need to be accompanied by algorithmic 722 * changes anyway. Several methods intrinsically sprawl because 723 * they must accumulate sets of consistent reads of fields held in 724 * local variables. Some others are artificially broken up to 725 * reduce producer/consumer imbalances due to dynamic compilation. 726 * There are also other coding oddities (including several 727 * unnecessary-looking hoisted null checks) that help some methods 728 * perform reasonably even when interpreted (not compiled). 729 * 730 * The order of declarations in this file is (with a few exceptions): 731 * (1) Static utility functions 732 * (2) Nested (static) classes 733 * (3) Static fields 734 * (4) Fields, along with constants used when unpacking some of them 735 * (5) Internal control methods 736 * (6) Callbacks and other support for ForkJoinTask methods 737 * (7) Exported methods 738 * (8) Static block initializing statics in minimally dependent order 739 */ 740 741 // Static utilities 742 743 /** 744 * If there is a security manager, makes sure caller has 745 * permission to modify threads. 746 */ 747 // private static void checkPermission() { 748 // SecurityManager security = System.getSecurityManager(); 749 // if (security !is null) 750 // security.checkPermission(modifyThreadPermission); 751 // } 752 753 754 // static fields (initialized in static initializer below) 755 756 /** 757 * Creates a new ForkJoinWorkerThread. This factory is used unless 758 * overridden in ForkJoinPool constructors. 759 */ 760 __gshared ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory; 761 762 /** 763 * Permission required for callers of methods that may start or 764 * kill threads. 765 */ 766 // __gshared RuntimePermission modifyThreadPermission; 767 768 /** 769 * Common (static) pool. Non-null for use unless a static 770 * construction exception, but internal usages null-check on use 771 * to paranoically avoid potential initialization circularities 772 * as well as to simplify generated code. 773 */ 774 __gshared ForkJoinPool common; 775 776 /** 777 * Common pool parallelism. To allow simpler use and management 778 * when common pool threads are disabled, we allow the underlying 779 * common.parallelism field to be zero, but in that case still report 780 * parallelism as 1 to reflect resulting caller-runs mechanics. 781 */ 782 __gshared int COMMON_PARALLELISM; 783 784 /** 785 * Limit on spare thread construction in tryCompensate. 786 */ 787 private __gshared int COMMON_MAX_SPARES; 788 789 /** 790 * Sequence number for creating workerNamePrefix. 791 */ 792 private shared static int poolNumberSequence; 793 794 /** 795 * Returns the next sequence number. We don't expect this to 796 * ever contend, so use simple builtin sync. 797 */ 798 private static int nextPoolId() { 799 return AtomicHelper.increment(poolNumberSequence); 800 } 801 802 // static configuration constants 803 804 /** 805 * Default idle timeout value (in milliseconds) for the thread 806 * triggering quiescence to park waiting for new work 807 */ 808 private enum long DEFAULT_KEEPALIVE = 60_000L; 809 810 /** 811 * Undershoot tolerance for idle timeouts 812 */ 813 private enum long TIMEOUT_SLOP = 20L; 814 815 /** 816 * The default value for COMMON_MAX_SPARES. Overridable using the 817 * "hunt.concurrency.ForkJoinPool.common.maximumSpares" system 818 * property. The default value is far in excess of normal 819 * requirements, but also far short of MAX_CAP and typical OS 820 * thread limits, so allows JVMs to catch misuse/abuse before 821 * running out of resources needed to do so. 822 */ 823 private enum int DEFAULT_COMMON_MAX_SPARES = 256; 824 825 /** 826 * Increment for seed generators. See class ThreadLocal for 827 * explanation. 828 */ 829 private enum int SEED_INCREMENT = 0x9e3779b9; 830 831 /* 832 * Bits and masks for field ctl, packed with 4 16 bit subfields: 833 * RC: Number of released (unqueued) workers minus target parallelism 834 * TC: Number of total workers minus target parallelism 835 * SS: version count and status of top waiting thread 836 * ID: poolIndex of top of Treiber stack of waiters 837 * 838 * When convenient, we can extract the lower 32 stack top bits 839 * (including version bits) as sp=(int)ctl. The offsets of counts 840 * by the target parallelism and the positionings of fields makes 841 * it possible to perform the most common checks via sign tests of 842 * fields: When ac is negative, there are not enough unqueued 843 * workers, when tc is negative, there are not enough total 844 * workers. When sp is non-zero, there are waiting workers. To 845 * deal with possibly negative fields, we use casts in and out of 846 * "short" and/or signed shifts to maintain signedness. 847 * 848 * Because it occupies uppermost bits, we can add one release count 849 * using getAndAddLong of RC_UNIT, rather than CAS, when returning 850 * from a blocked join. Other updates entail multiple subfields 851 * and masking, requiring CAS. 852 * 853 * The limits packed in field "bounds" are also offset by the 854 * parallelism level to make them comparable to the ctl rc and tc 855 * fields. 856 */ 857 858 // Lower and upper word masks 859 private enum long SP_MASK = 0xffffffffL; 860 private enum long UC_MASK = ~SP_MASK; 861 862 // Release counts 863 private enum int RC_SHIFT = 48; 864 private enum long RC_UNIT = 0x0001L << RC_SHIFT; 865 private enum long RC_MASK = 0xffffL << RC_SHIFT; 866 867 // Total counts 868 private enum int TC_SHIFT = 32; 869 private enum long TC_UNIT = 0x0001L << TC_SHIFT; 870 private enum long TC_MASK = 0xffffL << TC_SHIFT; 871 private enum long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign 872 873 // Instance fields 874 875 long stealCount; // collects worker nsteals 876 Duration keepAlive; // milliseconds before dropping if idle 877 int indexSeed; // next worker index 878 int bounds; // min, max threads packed as shorts 879 int mode; // parallelism, runstate, queue mode 880 WorkQueue[] workQueues; // main registry 881 string workerNamePrefix; // for worker thread string; sync lock 882 Object workerNameLocker; 883 ForkJoinWorkerThreadFactory factory; 884 UncaughtExceptionHandler ueh; // per-worker UEH 885 Predicate!(ForkJoinPool) saturate; 886 887 // @jdk.internal.vm.annotation.Contended("fjpctl") // segregate 888 shared long ctl; // main pool control 889 890 // Creating, registering and deregistering workers 891 892 /** 893 * Tries to construct and start one worker. Assumes that total 894 * count has already been incremented as a reservation. Invokes 895 * deregisterWorker on any failure. 896 * 897 * @return true if successful 898 */ 899 private bool createWorker() { 900 ForkJoinWorkerThreadFactory fac = factory; 901 Throwable ex = null; 902 ForkJoinWorkerThread wt = null; 903 try { 904 if (fac !is null && (wt = fac.newThread(this)) !is null) { 905 wt.start(); 906 return true; 907 } 908 } catch (Throwable rex) { 909 ex = rex; 910 } 911 deregisterWorker(wt, ex); 912 return false; 913 } 914 915 /** 916 * Tries to add one worker, incrementing ctl counts before doing 917 * so, relying on createWorker to back out on failure. 918 * 919 * @param c incoming ctl value, with total count negative and no 920 * idle workers. On CAS failure, c is refreshed and retried if 921 * this holds (otherwise, a new worker is not needed). 922 */ 923 private void tryAddWorker(long c) { 924 do { 925 long nc = ((RC_MASK & (c + RC_UNIT)) | 926 (TC_MASK & (c + TC_UNIT))); 927 if (ctl == c && AtomicHelper.compareAndSet(this.ctl, c, nc)) { 928 // version(HUNT_CONCURRENCY_DEBUG) tracef("nc=%d, ctl=%d, c=%d", nc, ctl, c); 929 createWorker(); 930 break; 931 } 932 } while (((c = ctl) & ADD_WORKER) != 0L && cast(int)c == 0); 933 } 934 935 /** 936 * Callback from ForkJoinWorkerThread constructor to establish and 937 * record its WorkQueue. 938 * 939 * @param wt the worker thread 940 * @return the worker's queue 941 */ 942 final WorkQueue registerWorker(ForkJoinWorkerThread wt) { 943 UncaughtExceptionHandler handler; 944 wt.isDaemon(true); // configure thread 945 if ((handler = ueh) !is null) 946 wt.setUncaughtExceptionHandler(handler); 947 int tid = 0; // for thread name 948 int idbits = mode & FIFO; 949 string prefix = workerNamePrefix; 950 WorkQueue w = new WorkQueue(this, wt); 951 if (prefix !is null) { 952 synchronized (this) { 953 WorkQueue[] ws = workQueues; 954 int n; 955 int s = indexSeed += SEED_INCREMENT; 956 idbits |= (s & ~(SMASK | FIFO | DORMANT)); 957 if (ws !is null && (n = cast(int)ws.length) > 1) { 958 int m = n - 1; 959 tid = m & ((s << 1) | 1); // odd-numbered indices 960 for (int probes = n >>> 1;;) { // find empty slot 961 WorkQueue q; 962 if ((q = ws[tid]) is null || q.phase == QUIET) 963 break; 964 else if (--probes == 0) { 965 tid = n | 1; // resize below 966 break; 967 } 968 else 969 tid = (tid + 2) & m; 970 } 971 w.phase = w.id = tid | idbits; // now publishable 972 973 if (tid < n) 974 ws[tid] = w; 975 else { // expand array 976 int an = n << 1; 977 WorkQueue[] as = new WorkQueue[an]; 978 as[tid] = w; 979 int am = an - 1; 980 for (int j = 0; j < n; ++j) { 981 WorkQueue v; // copy external queue 982 if ((v = ws[j]) !is null) // position may change 983 as[v.id & am & SQMASK] = v; 984 if (++j >= n) 985 break; 986 as[j] = ws[j]; // copy worker 987 } 988 workQueues = as; 989 } 990 } 991 } 992 wt.name(prefix ~ tid.to!string()); 993 } 994 return w; 995 } 996 997 /** 998 * Final callback from terminating worker, as well as upon failure 999 * to construct or start a worker. Removes record of worker from 1000 * array, and adjusts counts. If pool is shutting down, tries to 1001 * complete termination. 1002 * 1003 * @param wt the worker thread, or null if construction failed 1004 * @param ex the exception causing failure, or null if none 1005 */ 1006 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { 1007 WorkQueue w = null; 1008 int phase = 0; 1009 if (wt !is null && (w = wt.workQueue) !is null) { 1010 int wid = w.id; 1011 long ns = cast(long)w.nsteals & 0xffffffffL; 1012 if (!workerNamePrefix.empty()) { 1013 synchronized (this) { 1014 WorkQueue[] ws; size_t n, i; // remove index from array 1015 if ((ws = workQueues) !is null && (n = ws.length) > 0 && 1016 ws[i = wid & (n - 1)] == w) 1017 ws[i] = null; 1018 stealCount += ns; 1019 } 1020 } 1021 phase = w.phase; 1022 } 1023 if (phase != QUIET) { // else pre-adjusted 1024 long c; // decrement counts 1025 do {} while (!AtomicHelper.compareAndSet(this.ctl, c = ctl, 1026 ((RC_MASK & (c - RC_UNIT)) | 1027 (TC_MASK & (c - TC_UNIT)) | 1028 (SP_MASK & c)))); 1029 } 1030 if (w !is null) 1031 w.cancelAll(); // cancel remaining tasks 1032 1033 if (!tryTerminate(false, false) && // possibly replace worker 1034 w !is null && w.array !is null) // avoid repeated failures 1035 signalWork(); 1036 1037 if (ex !is null) // rethrow 1038 ForkJoinTaskHelper.rethrow(ex); 1039 } 1040 1041 /** 1042 * Tries to create or release a worker if too few are running. 1043 */ 1044 final void signalWork() { 1045 for (;;) { 1046 long c; int sp; WorkQueue[] ws; int i; WorkQueue v; 1047 if ((c = ctl) >= 0L) // enough workers 1048 break; 1049 else if ((sp = cast(int)c) == 0) { // no idle workers 1050 if ((c & ADD_WORKER) != 0L) // too few workers 1051 tryAddWorker(c); 1052 break; 1053 } 1054 else if ((ws = workQueues) is null) 1055 break; // unstarted/terminated 1056 else if (ws.length <= (i = sp & SMASK)) 1057 break; // terminated 1058 else if ((v = ws[i]) is null) 1059 break; // terminating 1060 else { 1061 int np = sp & ~UNSIGNALLED; 1062 int vp = v.phase; 1063 long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT)); 1064 Thread vt = v.owner; 1065 if (sp == vp && AtomicHelper.compareAndSet(this.ctl, c, nc)) { 1066 v.phase = np; 1067 if (vt !is null && v.source < 0) 1068 LockSupport.unpark(vt); 1069 break; 1070 } 1071 } 1072 } 1073 } 1074 1075 /** 1076 * Tries to decrement counts (sometimes implicitly) and possibly 1077 * arrange for a compensating worker in preparation for blocking: 1078 * If not all core workers yet exist, creates one, else if any are 1079 * unreleased (possibly including caller) releases one, else if 1080 * fewer than the minimum allowed number of workers running, 1081 * checks to see that they are all active, and if so creates an 1082 * extra worker unless over maximum limit and policy is to 1083 * saturate. Most of these steps can fail due to interference, in 1084 * which case 0 is returned so caller will retry. A negative 1085 * return value indicates that the caller doesn't need to 1086 * re-adjust counts when later unblocked. 1087 * 1088 * @return 1: block then adjust, -1: block without adjust, 0 : retry 1089 */ 1090 private int tryCompensate(WorkQueue w) { 1091 int t, n, sp; 1092 long c = ctl; 1093 WorkQueue[] ws = workQueues; 1094 if ((t = cast(short)(c >>> TC_SHIFT)) >= 0) { 1095 if (ws is null || (n = cast(int)ws.length) <= 0 || w is null) 1096 return 0; // disabled 1097 else if ((sp = cast(int)c) != 0) { // replace or release 1098 WorkQueue v = ws[sp & (n - 1)]; 1099 int wp = w.phase; 1100 long uc = UC_MASK & ((wp < 0) ? c + RC_UNIT : c); 1101 int np = sp & ~UNSIGNALLED; 1102 if (v !is null) { 1103 int vp = v.phase; 1104 Thread vt = v.owner; 1105 long nc = (cast(long)v.stackPred & SP_MASK) | uc; 1106 if (vp == sp && AtomicHelper.compareAndSet(this.ctl, c, nc)) { 1107 v.phase = np; 1108 if (vt !is null && v.source < 0) 1109 LockSupport.unpark(vt); 1110 return (wp < 0) ? -1 : 1; 1111 } 1112 } 1113 return 0; 1114 } 1115 else if (cast(int)(c >> RC_SHIFT) - // reduce parallelism 1116 cast(short)(bounds & SMASK) > 0) { 1117 long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c)); 1118 return AtomicHelper.compareAndSet(this.ctl, c, nc) ? 1 : 0; 1119 } 1120 else { // validate 1121 int md = mode, pc = md & SMASK, tc = pc + t, bc = 0; 1122 bool unstable = false; 1123 for (int i = 1; i < n; i += 2) { 1124 WorkQueue q; ThreadEx wt; ThreadState ts; 1125 if ((q = ws[i]) !is null) { 1126 if (q.source == 0) { 1127 unstable = true; 1128 break; 1129 } 1130 else { 1131 --tc; 1132 if ((wt = q.owner) !is null && 1133 ((ts = wt.getState()) == ThreadState.BLOCKED || 1134 ts == ThreadState.WAITING)) 1135 ++bc; // worker is blocking 1136 } 1137 } 1138 } 1139 if (unstable || tc != 0 || ctl != c) 1140 return 0; // inconsistent 1141 else if (t + pc >= MAX_CAP || t >= (bounds >>> SWIDTH)) { 1142 Predicate!(ForkJoinPool) sat; 1143 if ((sat = saturate) !is null && sat(this)) 1144 return -1; 1145 else if (bc < pc) { // lagging 1146 Thread.yield(); // for retry spins 1147 return 0; 1148 } 1149 else 1150 throw new RejectedExecutionException( 1151 "Thread limit exceeded replacing blocked worker"); 1152 } 1153 } 1154 } 1155 1156 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); // expand pool 1157 return AtomicHelper.compareAndSet(this.ctl, c, nc) && createWorker() ? 1 : 0; 1158 } 1159 1160 /** 1161 * Top-level runloop for workers, called by ForkJoinWorkerThread.run. 1162 * See above for explanation. 1163 */ 1164 final void runWorker(WorkQueue w) { 1165 int r = (w.id ^ ThreadLocalRandom.nextSecondarySeed()) | FIFO; // rng 1166 w.array = new IForkJoinTask[INITIAL_QUEUE_CAPACITY]; // initialize 1167 while(true) { 1168 int phase; 1169 if (scan(w, r)) { // scan until apparently empty 1170 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // move (xorshift) 1171 } 1172 else if ((phase = w.phase) >= 0) { // enqueue, then rescan 1173 long np = (w.phase = (phase + SS_SEQ) | UNSIGNALLED) & SP_MASK; 1174 long c, nc; 1175 do { 1176 w.stackPred = cast(int)(c = ctl); 1177 nc = ((c - RC_UNIT) & UC_MASK) | np; 1178 } while (!AtomicHelper.compareAndSet(this.ctl, c, nc)); 1179 1180 version(HUNT_CONCURRENCY_DEBUG) { 1181 // infof("ctl=%d, c=%d, nc=%d, stackPred=%d", ctl, c, nc, w.stackPred); 1182 } 1183 } 1184 else { // already queued 1185 1186 int pred = w.stackPred; 1187 ThreadEx.interrupted(); // clear before park 1188 w.source = DORMANT; // enable signal 1189 long c = ctl; 1190 int md = mode, rc = (md & SMASK) + cast(int)(c >> RC_SHIFT); 1191 1192 version(HUNT_CONCURRENCY_DEBUG) { 1193 // tracef("md=%d, rc=%d, c=%d, pred=%d, phase=%d", md, rc, c, pred, phase); 1194 } 1195 1196 if (md < 0) { // terminating 1197 break; 1198 } else if (rc <= 0 && (md & SHUTDOWN) != 0 && tryTerminate(false, false)) { 1199 break; // quiescent shutdown 1200 } else if (rc <= 0 && pred != 0 && phase == cast(int)c) { 1201 long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred); 1202 MonoTime d = MonoTime.currTime + keepAlive; // DateTime.currentTimeMillis(); 1203 LockSupport.parkUntil(this, d); 1204 if (ctl == c && // drop on timeout if all idle 1205 d - MonoTime.currTime <= TIMEOUT_SLOP.msecs && 1206 AtomicHelper.compareAndSet(this.ctl, c, nc)) { 1207 w.phase = QUIET; 1208 break; 1209 } 1210 } else if (w.phase < 0) { 1211 LockSupport.park(this); // OK if spuriously woken 1212 break; 1213 } 1214 w.source = 0; // disable signal 1215 } 1216 } 1217 } 1218 1219 /** 1220 * Scans for and if found executes one or more top-level tasks from a queue. 1221 * 1222 * @return true if found an apparently non-empty queue, and 1223 * possibly ran task(s). 1224 */ 1225 private bool scan(WorkQueue w, int r) { 1226 WorkQueue[] ws; int n; 1227 if ((ws = workQueues) !is null && (n = cast(int)ws.length) > 0 && w !is null) { 1228 for (int m = n - 1, j = r & m;;) { 1229 WorkQueue q; int b; 1230 if ((q = ws[j]) !is null && q.top != (b = q.base)) { 1231 int qid = q.id; 1232 IForkJoinTask[] a; 1233 size_t cap, k; 1234 IForkJoinTask t; 1235 1236 if ((a = q.array) !is null && (cap = cast(int)a.length) > 0) { 1237 k = (cap - 1) & b; 1238 // import core.atomic; 1239 // auto ss = core.atomic.atomicLoad((cast(shared)a[k])); 1240 // FIXME: Needing refactor or cleanup -@zxp at 2/6/2019, 5:12:19 PM 1241 // IForkJoinTask tt = a[k]; 1242 // t = AtomicHelper.load(tt); 1243 t = a[k]; 1244 // tracef("k=%d, t is null: %s", k, t is null); 1245 if (q.base == b++ && t !is null && 1246 AtomicHelper.compareAndSet(a[k], t, cast(IForkJoinTask)null)) { 1247 q.base = b; 1248 w.source = qid; 1249 if (q.top - b > 0) signalWork(); 1250 1251 // infof("IForkJoinTask: %s", typeid(cast(Object)t)); 1252 w.topLevelExec(t, q, // random fairness bound 1253 r & ((n << TOP_BOUND_SHIFT) - 1)); 1254 } 1255 } 1256 return true; 1257 } 1258 else if (--n > 0) { 1259 j = (j + 1) & m; 1260 } 1261 else { 1262 break; 1263 } 1264 } 1265 } 1266 return false; 1267 } 1268 1269 /** 1270 * Helps and/or blocks until the given task is done or timeout. 1271 * First tries locally helping, then scans other queues for a task 1272 * produced by one of w's stealers; compensating and blocking if 1273 * none are found (rescanning if tryCompensate fails). 1274 * 1275 * @param w caller 1276 * @param task the task 1277 * @param deadline for timed waits, if nonzero 1278 * @return task status on exit 1279 */ 1280 final int awaitJoin(WorkQueue w, IForkJoinTask task, MonoTime deadline) { 1281 if(w is null || task is null) { 1282 return 0; 1283 } 1284 int s = 0; 1285 int seed = ThreadLocalRandom.nextSecondarySeed(); 1286 ICountedCompleter cc = cast(ICountedCompleter)task; 1287 if(cc !is null) { 1288 s = w.helpCC(cc, 0, false); 1289 if(s<0) 1290 return 0; 1291 } 1292 1293 w.tryRemoveAndExec(task); 1294 int src = w.source, id = w.id; 1295 int r = (seed >>> 16) | 1, step = (seed & ~1) | 2; 1296 s = task.getStatus(); 1297 while (s >= 0) { 1298 WorkQueue[] ws; 1299 int n = (ws = workQueues) is null ? 0 : cast(int)ws.length, m = n - 1; 1300 while (n > 0) { 1301 WorkQueue q; int b; 1302 if ((q = ws[r & m]) !is null && q.source == id && 1303 q.top != (b = q.base)) { 1304 IForkJoinTask[] a; int cap, k; 1305 int qid = q.id; 1306 if ((a = q.array) !is null && (cap = cast(int)a.length) > 0) { 1307 k = (cap - 1) & b; 1308 // FIXME: Needing refactor or cleanup -@zxp at 2/6/2019, 5:13:08 PM 1309 // 1310 // IForkJoinTask t = AtomicHelper.load(a[k]); 1311 IForkJoinTask t = a[k]; 1312 if (q.source == id && q.base == b++ && 1313 t !is null && AtomicHelper.compareAndSet(a[k], t, cast(IForkJoinTask)null)) { 1314 q.base = b; 1315 w.source = qid; 1316 t.doExec(); 1317 w.source = src; 1318 } 1319 } 1320 break; 1321 } 1322 else { 1323 r += step; 1324 --n; 1325 } 1326 } 1327 1328 if ((s = task.getStatus()) < 0) 1329 break; 1330 else if (n == 0) { // empty scan 1331 long ms; int block; 1332 Duration ns; 1333 if (deadline == MonoTime.zero()) 1334 ms = 0L; // untimed 1335 else if ((ns = deadline - MonoTime.currTime) <= Duration.zero()) 1336 break; // timeout 1337 else if ((ms = ns.total!(TimeUnit.Millisecond)()) <= 0L) 1338 ms = 1L; // avoid 0 for timed wait 1339 if ((block = tryCompensate(w)) != 0) { 1340 task.internalWait(ms); 1341 AtomicHelper.getAndAdd(this.ctl, (block > 0) ? RC_UNIT : 0L); 1342 } 1343 s = task.getStatus(); 1344 } 1345 } 1346 return s; 1347 } 1348 1349 /** 1350 * Runs tasks until {@code isQuiescent()}. Rather than blocking 1351 * when tasks cannot be found, rescans until all others cannot 1352 * find tasks either. 1353 */ 1354 final void helpQuiescePool(WorkQueue w) { 1355 int prevSrc = w.source; 1356 int seed = ThreadLocalRandom.nextSecondarySeed(); 1357 int r = seed >>> 16, step = r | 1; 1358 for (int source = prevSrc, released = -1;;) { // -1 until known 1359 IForkJoinTask localTask; WorkQueue[] ws; 1360 while ((localTask = w.nextLocalTask()) !is null) 1361 localTask.doExec(); 1362 if (w.phase >= 0 && released == -1) 1363 released = 1; 1364 bool quiet = true, empty = true; 1365 int n = (ws = workQueues) is null ? 0 : cast(int)ws.length; 1366 for (int m = n - 1; n > 0; r += step, --n) { 1367 WorkQueue q; int b; 1368 if ((q = ws[r & m]) !is null) { 1369 int qs = q.source; 1370 if (q.top != (b = q.base)) { 1371 quiet = empty = false; 1372 IForkJoinTask[] a; int cap, k; 1373 int qid = q.id; 1374 if ((a = q.array) !is null && (cap = cast(int)a.length) > 0) { 1375 if (released == 0) { // increment 1376 released = 1; 1377 AtomicHelper.getAndAdd(this.ctl, RC_UNIT); 1378 } 1379 k = (cap - 1) & b; 1380 // IForkJoinTask t = AtomicHelper.load(a[k]); 1381 // FIXME: Needing refactor or cleanup -@zxp at 2/6/2019, 9:32:07 PM 1382 // 1383 IForkJoinTask t = a[k]; 1384 if (q.base == b++ && t !is null) { 1385 if(AtomicHelper.compareAndSet(a[k], t, null)) { 1386 q.base = b; 1387 w.source = qid; 1388 t.doExec(); 1389 w.source = source = prevSrc; 1390 } 1391 } 1392 } 1393 break; 1394 } 1395 else if ((qs & QUIET) == 0) 1396 quiet = false; 1397 } 1398 } 1399 if (quiet) { 1400 if (released == 0) { 1401 AtomicHelper.getAndAdd(this.ctl, RC_UNIT); 1402 } 1403 w.source = prevSrc; 1404 break; 1405 } 1406 else if (empty) { 1407 if (source != QUIET) 1408 w.source = source = QUIET; 1409 if (released == 1) { // decrement 1410 released = 0; 1411 AtomicHelper.getAndAdd(this.ctl, RC_MASK & -RC_UNIT); 1412 } 1413 } 1414 } 1415 } 1416 1417 /** 1418 * Scans for and returns a polled task, if available. 1419 * Used only for untracked polls. 1420 * 1421 * @param submissionsOnly if true, only scan submission queues 1422 */ 1423 private IForkJoinTask pollScan(bool submissionsOnly) { 1424 WorkQueue[] ws; int n; 1425 rescan: while ((mode & STOP) == 0 && (ws = workQueues) !is null && 1426 (n = cast(int)ws.length) > 0) { 1427 int m = n - 1; 1428 int r = ThreadLocalRandom.nextSecondarySeed(); 1429 int h = r >>> 16; 1430 int origin, step; 1431 if (submissionsOnly) { 1432 origin = (r & ~1) & m; // even indices and steps 1433 step = (h & ~1) | 2; 1434 } 1435 else { 1436 origin = r & m; 1437 step = h | 1; 1438 } 1439 bool nonempty = false; 1440 for (int i = origin, oldSum = 0, checkSum = 0;;) { 1441 WorkQueue q; 1442 if ((q = ws[i]) !is null) { 1443 int b; IForkJoinTask t; 1444 if (q.top - (b = q.base) > 0) { 1445 nonempty = true; 1446 if ((t = q.poll()) !is null) 1447 return t; 1448 } 1449 else 1450 checkSum += b + q.id; 1451 } 1452 if ((i = (i + step) & m) == origin) { 1453 if (!nonempty && oldSum == (oldSum = checkSum)) 1454 break rescan; 1455 checkSum = 0; 1456 nonempty = false; 1457 } 1458 } 1459 } 1460 return null; 1461 } 1462 1463 /** 1464 * Gets and removes a local or stolen task for the given worker. 1465 * 1466 * @return a task, if available 1467 */ 1468 final IForkJoinTask nextTaskFor(WorkQueue w) { 1469 IForkJoinTask t; 1470 if (w is null || (t = w.nextLocalTask()) is null) 1471 t = pollScan(false); 1472 return t; 1473 } 1474 1475 // External operations 1476 1477 /** 1478 * Adds the given task to a submission queue at submitter's 1479 * current queue, creating one if null or contended. 1480 * 1481 * @param task the task. Caller must ensure non-null. 1482 */ 1483 final void externalPush(IForkJoinTask task) { 1484 // initialize caller's probe 1485 int r = ThreadLocalRandom.getProbe(); 1486 if (r == 0) { 1487 ThreadLocalRandom.localInit(); 1488 r = ThreadLocalRandom.getProbe(); 1489 } 1490 1491 for (;;) { 1492 WorkQueue q; 1493 int md = mode, n; 1494 WorkQueue[] ws = workQueues; 1495 if ((md & SHUTDOWN) != 0 || ws is null || (n = cast(int)ws.length) <= 0) 1496 throw new RejectedExecutionException(); 1497 else if ((q = ws[(n - 1) & r & SQMASK]) is null) { // add queue 1498 int qid = (r | QUIET) & ~(FIFO | OWNED); 1499 Object lock = workerNameLocker; 1500 IForkJoinTask[] qa = 1501 new IForkJoinTask[INITIAL_QUEUE_CAPACITY]; 1502 q = new WorkQueue(this, null); 1503 q.array = qa; 1504 q.id = qid; 1505 q.source = QUIET; 1506 if (lock !is null) { // unless disabled, lock pool to install 1507 synchronized (lock) { 1508 WorkQueue[] vs; int i, vn; 1509 if ((vs = workQueues) !is null && (vn = cast(int)vs.length) > 0 && 1510 vs[i = qid & (vn - 1) & SQMASK] is null) 1511 vs[i] = q; // else another thread already installed 1512 } 1513 } 1514 } 1515 else if (!q.tryLockPhase()) // move if busy 1516 r = ThreadLocalRandom.advanceProbe(r); 1517 else { 1518 if (q.lockedPush(task)) 1519 signalWork(); 1520 return; 1521 } 1522 } 1523 } 1524 1525 /** 1526 * Pushes a possibly-external submission. 1527 */ 1528 private IForkJoinTask externalSubmit(IForkJoinTask task) { 1529 if (task is null) 1530 throw new NullPointerException(); 1531 ForkJoinWorkerThread w = cast(ForkJoinWorkerThread)Thread.getThis(); 1532 WorkQueue q; 1533 if ( w !is null && w.pool is this && 1534 (q = w.workQueue) !is null) 1535 q.push(task); 1536 else 1537 externalPush(task); 1538 return task; 1539 } 1540 1541 private ForkJoinTask!(T) externalSubmit(T)(ForkJoinTask!(T) task) { 1542 if (task is null) 1543 throw new NullPointerException(); 1544 ForkJoinWorkerThread w = cast(ForkJoinWorkerThread)Thread.getThis(); 1545 WorkQueue q; 1546 if ( w !is null && w.pool is this && 1547 (q = w.workQueue) !is null) 1548 q.push(task); 1549 else 1550 externalPush(task); 1551 return task; 1552 } 1553 1554 /** 1555 * Returns common pool queue for an external thread. 1556 */ 1557 static WorkQueue commonSubmitterQueue() { 1558 ForkJoinPool p = common; 1559 int r = ThreadLocalRandom.getProbe(); 1560 WorkQueue[] ws; int n; 1561 return (p !is null && (ws = p.workQueues) !is null && 1562 (n = cast(int)ws.length) > 0) ? 1563 ws[(n - 1) & r & SQMASK] : null; 1564 } 1565 1566 /** 1567 * Performs tryUnpush for an external submitter. 1568 */ 1569 final bool tryExternalUnpush(IForkJoinTask task) { 1570 int r = ThreadLocalRandom.getProbe(); 1571 WorkQueue[] ws; WorkQueue w; int n; 1572 return ((ws = workQueues) !is null && 1573 (n = cast(int)ws.length) > 0 && 1574 (w = ws[(n - 1) & r & SQMASK]) !is null && 1575 w.tryLockedUnpush(task)); 1576 } 1577 1578 /** 1579 * Performs helpComplete for an external submitter. 1580 */ 1581 final int externalHelpComplete(ICountedCompleter task, int maxTasks) { 1582 int r = ThreadLocalRandom.getProbe(); 1583 WorkQueue[] ws; WorkQueue w; int n; 1584 return ((ws = workQueues) !is null && (n = cast(int)ws.length) > 0 && 1585 (w = ws[(n - 1) & r & SQMASK]) !is null) ? 1586 w.helpCC(task, maxTasks, true) : 0; 1587 } 1588 1589 /** 1590 * Tries to steal and run tasks within the target's computation. 1591 * The maxTasks argument supports external usages; internal calls 1592 * use zero, allowing unbounded steps (external calls trap 1593 * non-positive values). 1594 * 1595 * @param w caller 1596 * @param maxTasks if non-zero, the maximum number of other tasks to run 1597 * @return task status on exit 1598 */ 1599 final int helpComplete(WorkQueue w, ICountedCompleter task, 1600 int maxTasks) { 1601 return (w is null) ? 0 : w.helpCC(task, maxTasks, false); 1602 } 1603 1604 /** 1605 * Returns a cheap heuristic guide for task partitioning when 1606 * programmers, frameworks, tools, or languages have little or no 1607 * idea about task granularity. In essence, by offering this 1608 * method, we ask users only about tradeoffs in overhead vs 1609 * expected throughput and its variance, rather than how finely to 1610 * partition tasks. 1611 * 1612 * In a steady state strict (tree-structured) computation, each 1613 * thread makes available for stealing enough tasks for other 1614 * threads to remain active. Inductively, if all threads play by 1615 * the same rules, each thread should make available only a 1616 * constant number of tasks. 1617 * 1618 * The minimum useful constant is just 1. But using a value of 1 1619 * would require immediate replenishment upon each steal to 1620 * maintain enough tasks, which is infeasible. Further, 1621 * partitionings/granularities of offered tasks should minimize 1622 * steal rates, which in general means that threads nearer the top 1623 * of computation tree should generate more than those nearer the 1624 * bottom. In perfect steady state, each thread is at 1625 * approximately the same level of computation tree. However, 1626 * producing extra tasks amortizes the uncertainty of progress and 1627 * diffusion assumptions. 1628 * 1629 * So, users will want to use values larger (but not much larger) 1630 * than 1 to both smooth over shortages and hedge 1631 * against uneven progress; as traded off against the cost of 1632 * extra task overhead. We leave the user to pick a threshold 1633 * value to compare with the results of this call to guide 1634 * decisions, but recommend values such as 3. 1635 * 1636 * When all threads are active, it is on average OK to estimate 1637 * surplus strictly locally. In steady-state, if one thread is 1638 * maintaining say 2 surplus tasks, then so are others. So we can 1639 * just use estimated queue length. However, this strategy alone 1640 * leads to serious mis-estimates in some non-steady-state 1641 * conditions (ramp-up, ramp-down, other stalls). We can detect 1642 * many of these by further considering the number of "idle" 1643 * threads, that are known to have zero queued tasks, so 1644 * compensate by a factor of (#idle/#active) threads. 1645 */ 1646 static int getSurplusQueuedTaskCount() { 1647 Thread t = Thread.getThis(); 1648 ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)t; 1649 ForkJoinPool pool; WorkQueue q; 1650 1651 if (wt !is null && (pool = wt.pool) !is null && 1652 (q = wt.workQueue) !is null) { 1653 int p = pool.mode & SMASK; 1654 int a = p + cast(int)(pool.ctl >> RC_SHIFT); 1655 int n = q.top - q.base; 1656 return n - (a > (p >>>= 1) ? 0 : 1657 a > (p >>>= 1) ? 1 : 1658 a > (p >>>= 1) ? 2 : 1659 a > (p >>>= 1) ? 4 : 1660 8); 1661 } 1662 return 0; 1663 } 1664 1665 // Termination 1666 1667 /** 1668 * Possibly initiates and/or completes termination. 1669 * 1670 * @param now if true, unconditionally terminate, else only 1671 * if no work and no active workers 1672 * @param enable if true, terminate when next possible 1673 * @return true if terminating or terminated 1674 */ 1675 private bool tryTerminate(bool now, bool enable) { 1676 int md; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED 1677 1678 while (((md = mode) & SHUTDOWN) == 0) { 1679 if (!enable || this == common) // cannot shutdown 1680 return false; 1681 else { 1682 AtomicHelper.compareAndSet(this.mode, md, md | SHUTDOWN); 1683 } 1684 1685 } 1686 1687 while (((md = mode) & STOP) == 0) { // try to initiate termination 1688 if (!now) { // check if quiescent & empty 1689 for (long oldSum = 0L;;) { // repeat until stable 1690 bool running = false; 1691 long checkSum = ctl; 1692 WorkQueue[] ws = workQueues; 1693 if ((md & SMASK) + cast(int)(checkSum >> RC_SHIFT) > 0) 1694 running = true; 1695 else if (ws !is null) { 1696 WorkQueue w; 1697 for (int i = 0; i < ws.length; ++i) { 1698 if ((w = ws[i]) !is null) { 1699 int s = w.source, p = w.phase; 1700 int d = w.id, b = w.base; 1701 if (b != w.top || 1702 ((d & 1) == 1 && (s >= 0 || p >= 0))) { 1703 running = true; 1704 break; // working, scanning, or have work 1705 } 1706 checkSum += ((cast(long)s << 48) + (cast(long)p << 32) + 1707 (cast(long)b << 16) + cast(long)d); 1708 } 1709 } 1710 } 1711 if (((md = mode) & STOP) != 0) 1712 break; // already triggered 1713 else if (running) 1714 return false; 1715 else if (workQueues == ws && oldSum == (oldSum = checkSum)) 1716 break; 1717 } 1718 } 1719 if ((md & STOP) == 0) 1720 AtomicHelper.compareAndSet(this.mode, md, md | STOP); 1721 } 1722 1723 while (((md = mode) & TERMINATED) == 0) { // help terminate others 1724 for (long oldSum = 0L;;) { // repeat until stable 1725 WorkQueue[] ws; WorkQueue w; 1726 long checkSum = ctl; 1727 if ((ws = workQueues) !is null) { 1728 for (int i = 0; i < ws.length; ++i) { 1729 if ((w = ws[i]) !is null) { 1730 ForkJoinWorkerThread wt = w.owner; 1731 w.cancelAll(); // clear queues 1732 if (wt !is null) { 1733 try { // unblock join or park 1734 wt.interrupt(); 1735 } catch (Throwable ignore) { 1736 } 1737 } 1738 checkSum += (cast(long)w.phase << 32) + w.base; 1739 } 1740 } 1741 } 1742 if (((md = mode) & TERMINATED) != 0 || 1743 (workQueues == ws && oldSum == (oldSum = checkSum))) 1744 break; 1745 } 1746 if ((md & TERMINATED) != 0) 1747 break; 1748 else if ((md & SMASK) + cast(short)(ctl >>> TC_SHIFT) > 0) 1749 break; 1750 else if (AtomicHelper.compareAndSet(this.mode, md, md | TERMINATED)) { 1751 synchronized (this) { 1752 // notifyAll(); // for awaitTermination 1753 // TODO: Tasks pending completion -@zxp at 2/4/2019, 11:03:21 AM 1754 // 1755 } 1756 break; 1757 } 1758 } 1759 return true; 1760 } 1761 1762 // Exported methods 1763 1764 // Constructors 1765 1766 /** 1767 * Creates a {@code ForkJoinPool} with parallelism equal to {@link 1768 * java.lang.Runtime#availableProcessors}, using defaults for all 1769 * other parameters (see {@link #ForkJoinPool(int, 1770 * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, bool, 1771 * int, int, int, Predicate, long, TimeUnit)}). 1772 * 1773 * @throws SecurityException if a security manager exists and 1774 * the caller is not permitted to modify threads 1775 * because it does not hold {@link 1776 * java.lang.RuntimePermission}{@code ("modifyThread")} 1777 */ 1778 this() { 1779 this(min(MAX_CAP, totalCPUs), 1780 defaultForkJoinWorkerThreadFactory, null, false, 1781 0, MAX_CAP, 1, null, dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE)); 1782 } 1783 1784 /** 1785 * Creates a {@code ForkJoinPool} with the indicated parallelism 1786 * level, using defaults for all other parameters (see {@link 1787 * #ForkJoinPool(int, ForkJoinWorkerThreadFactory, 1788 * UncaughtExceptionHandler, bool, int, int, int, Predicate, 1789 * long, TimeUnit)}). 1790 * 1791 * @param parallelism the parallelism level 1792 * @throws IllegalArgumentException if parallelism less than or 1793 * equal to zero, or greater than implementation limit 1794 * @throws SecurityException if a security manager exists and 1795 * the caller is not permitted to modify threads 1796 * because it does not hold {@link 1797 * java.lang.RuntimePermission}{@code ("modifyThread")} 1798 */ 1799 this(int parallelism) { 1800 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false, 1801 0, MAX_CAP, 1, null, dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE)); 1802 } 1803 1804 /** 1805 * Creates a {@code ForkJoinPool} with the given parameters (using 1806 * defaults for others -- see {@link #ForkJoinPool(int, 1807 * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, bool, 1808 * int, int, int, Predicate, long, TimeUnit)}). 1809 * 1810 * @param parallelism the parallelism level. For default value, 1811 * use {@link java.lang.Runtime#availableProcessors}. 1812 * @param factory the factory for creating new threads. For default value, 1813 * use {@link #defaultForkJoinWorkerThreadFactory}. 1814 * @param handler the handler for internal worker threads that 1815 * terminate due to unrecoverable errors encountered while executing 1816 * tasks. For default value, use {@code null}. 1817 * @param asyncMode if true, 1818 * establishes local first-in-first-out scheduling mode for forked 1819 * tasks that are never joined. This mode may be more appropriate 1820 * than default locally stack-based mode in applications in which 1821 * worker threads only process event-style asynchronous tasks. 1822 * For default value, use {@code false}. 1823 * @throws IllegalArgumentException if parallelism less than or 1824 * equal to zero, or greater than implementation limit 1825 * @throws NullPointerException if the factory is null 1826 * @throws SecurityException if a security manager exists and 1827 * the caller is not permitted to modify threads 1828 * because it does not hold {@link 1829 * java.lang.RuntimePermission}{@code ("modifyThread")} 1830 */ 1831 this(int parallelism, 1832 ForkJoinWorkerThreadFactory factory, 1833 UncaughtExceptionHandler handler, 1834 bool asyncMode) { 1835 this(parallelism, factory, handler, asyncMode, 1836 0, MAX_CAP, 1, null, dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE)); 1837 } 1838 1839 /** 1840 * Creates a {@code ForkJoinPool} with the given parameters. 1841 * 1842 * @param parallelism the parallelism level. For default value, 1843 * use {@link java.lang.Runtime#availableProcessors}. 1844 * 1845 * @param factory the factory for creating new threads. For 1846 * default value, use {@link #defaultForkJoinWorkerThreadFactory}. 1847 * 1848 * @param handler the handler for internal worker threads that 1849 * terminate due to unrecoverable errors encountered while 1850 * executing tasks. For default value, use {@code null}. 1851 * 1852 * @param asyncMode if true, establishes local first-in-first-out 1853 * scheduling mode for forked tasks that are never joined. This 1854 * mode may be more appropriate than default locally stack-based 1855 * mode in applications in which worker threads only process 1856 * event-style asynchronous tasks. For default value, use {@code 1857 * false}. 1858 * 1859 * @param corePoolSize the number of threads to keep in the pool 1860 * (unless timed out after an elapsed keep-alive). Normally (and 1861 * by default) this is the same value as the parallelism level, 1862 * but may be set to a larger value to reduce dynamic overhead if 1863 * tasks regularly block. Using a smaller value (for example 1864 * {@code 0}) has the same effect as the default. 1865 * 1866 * @param maximumPoolSize the maximum number of threads allowed. 1867 * When the maximum is reached, attempts to replace blocked 1868 * threads fail. (However, because creation and termination of 1869 * different threads may overlap, and may be managed by the given 1870 * thread factory, this value may be transiently exceeded.) To 1871 * arrange the same value as is used by default for the common 1872 * pool, use {@code 256} plus the {@code parallelism} level. (By 1873 * default, the common pool allows a maximum of 256 spare 1874 * threads.) Using a value (for example {@code 1875 * Integer.MAX_VALUE}) larger than the implementation's total 1876 * thread limit has the same effect as using this limit (which is 1877 * the default). 1878 * 1879 * @param minimumRunnable the minimum allowed number of core 1880 * threads not blocked by a join or {@link ManagedBlocker}. To 1881 * ensure progress, when too few unblocked threads exist and 1882 * unexecuted tasks may exist, new threads are constructed, up to 1883 * the given maximumPoolSize. For the default value, use {@code 1884 * 1}, that ensures liveness. A larger value might improve 1885 * throughput in the presence of blocked activities, but might 1886 * not, due to increased overhead. A value of zero may be 1887 * acceptable when submitted tasks cannot have dependencies 1888 * requiring additional threads. 1889 * 1890 * @param saturate if non-null, a predicate invoked upon attempts 1891 * to create more than the maximum total allowed threads. By 1892 * default, when a thread is about to block on a join or {@link 1893 * ManagedBlocker}, but cannot be replaced because the 1894 * maximumPoolSize would be exceeded, a {@link 1895 * RejectedExecutionException} is thrown. But if this predicate 1896 * returns {@code true}, then no exception is thrown, so the pool 1897 * continues to operate with fewer than the target number of 1898 * runnable threads, which might not ensure progress. 1899 * 1900 * @param keepAliveTime the elapsed time since last use before 1901 * a thread is terminated (and then later replaced if needed). 1902 * For the default value, use {@code 60, TimeUnit.SECONDS}. 1903 * 1904 * @param unit the time unit for the {@code keepAliveTime} argument 1905 * 1906 * @throws IllegalArgumentException if parallelism is less than or 1907 * equal to zero, or is greater than implementation limit, 1908 * or if maximumPoolSize is less than parallelism, 1909 * of if the keepAliveTime is less than or equal to zero. 1910 * @throws NullPointerException if the factory is null 1911 * @throws SecurityException if a security manager exists and 1912 * the caller is not permitted to modify threads 1913 * because it does not hold {@link 1914 * java.lang.RuntimePermission}{@code ("modifyThread")} 1915 */ 1916 this(int parallelism, 1917 ForkJoinWorkerThreadFactory factory, 1918 UncaughtExceptionHandler handler, 1919 bool asyncMode, 1920 int corePoolSize, 1921 int maximumPoolSize, 1922 int minimumRunnable, 1923 Predicate!(ForkJoinPool) saturate, 1924 Duration keepAliveTime) { 1925 // check, encode, pack parameters 1926 if (parallelism <= 0 || parallelism > MAX_CAP || 1927 maximumPoolSize < parallelism || keepAliveTime <= Duration.zero) 1928 throw new IllegalArgumentException(); 1929 if (factory is null) 1930 throw new NullPointerException(); 1931 long ms = max(keepAliveTime.total!(TimeUnit.Millisecond), TIMEOUT_SLOP); 1932 1933 int corep = min(max(corePoolSize, parallelism), MAX_CAP); 1934 long c = (((cast(long)(-corep) << TC_SHIFT) & TC_MASK) | 1935 ((cast(long)(-parallelism) << RC_SHIFT) & RC_MASK)); 1936 int m = parallelism | (asyncMode ? FIFO : 0); 1937 int maxSpares = min(maximumPoolSize, MAX_CAP) - parallelism; 1938 int minAvail = min(max(minimumRunnable, 0), MAX_CAP); 1939 int b = ((minAvail - parallelism) & SMASK) | (maxSpares << SWIDTH); 1940 int n = (parallelism > 1) ? parallelism - 1 : 1; // at least 2 slots 1941 n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; 1942 n = (n + 1) << 1; // power of two, including space for submission queues 1943 1944 this.workerNamePrefix = "ForkJoinPool-" ~ nextPoolId().to!string() ~ "-worker-"; 1945 this.workQueues = new WorkQueue[n]; 1946 this.factory = factory; 1947 this.ueh = handler; 1948 this.saturate = saturate; 1949 this.keepAlive = ms.msecs; 1950 this.bounds = b; 1951 this.mode = m; 1952 this.ctl = c; 1953 // checkPermission(); 1954 workerNameLocker = new Object(); 1955 } 1956 1957 private static Object newInstanceFrom(string className) { 1958 return (className.empty) ? null : Object.factory(className); 1959 } 1960 1961 /** 1962 * Constructor for common pool using parameters possibly 1963 * overridden by system properties 1964 */ 1965 private this(byte forCommonPoolOnly) { 1966 int parallelism = -1; 1967 ForkJoinWorkerThreadFactory fac = null; 1968 UncaughtExceptionHandler handler = null; 1969 try { // ignore exceptions in accessing/parsing properties 1970 ConfigBuilder config = Environment.getProperties(); 1971 string pp = config.getProperty("hunt.concurrency.ForkJoinPool.common.parallelism"); 1972 if (!pp.empty) { 1973 parallelism = pp.to!int(); 1974 } 1975 1976 string className = config.getProperty("hunt.concurrency.ForkJoinPool.common.threadFactory"); 1977 fac = cast(ForkJoinWorkerThreadFactory) newInstanceFrom(className); 1978 1979 className = config.getProperty("hunt.concurrency.ForkJoinPool.common.exceptionHandler"); 1980 handler = cast(UncaughtExceptionHandler) newInstanceFrom(className); 1981 } catch (Exception ignore) { 1982 version(HUNT_DEBUG) { 1983 warning(ignore); 1984 } 1985 } 1986 1987 if (fac is null) { 1988 // if (System.getSecurityManager() is null) 1989 fac = defaultForkJoinWorkerThreadFactory; 1990 // else // use security-managed default 1991 // fac = new InnocuousForkJoinWorkerThreadFactory(); 1992 } 1993 if (parallelism < 0 && // default 1 less than #cores 1994 (parallelism = totalCPUs - 1) <= 0) 1995 parallelism = 1; 1996 if (parallelism > MAX_CAP) 1997 parallelism = MAX_CAP; 1998 1999 long c = (((cast(long)(-parallelism) << TC_SHIFT) & TC_MASK) | 2000 ((cast(long)(-parallelism) << RC_SHIFT) & RC_MASK)); 2001 int b = ((1 - parallelism) & SMASK) | (COMMON_MAX_SPARES << SWIDTH); 2002 int n = (parallelism > 1) ? parallelism - 1 : 1; 2003 n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; 2004 n = (n + 1) << 1; 2005 2006 this.workerNamePrefix = "ForkJoinPool.commonPool-worker-"; 2007 this.workerNameLocker = new Object(); 2008 this.workQueues = new WorkQueue[n]; 2009 this.factory = fac; 2010 this.ueh = handler; 2011 this.saturate = null; 2012 this.keepAlive = DEFAULT_KEEPALIVE.msecs; 2013 this.bounds = b; 2014 this.mode = parallelism; 2015 this.ctl = c; 2016 } 2017 2018 /** 2019 * Returns the common pool instance. This pool is statically 2020 * constructed; its run state is unaffected by attempts to {@link 2021 * #shutdown} or {@link #shutdownNow}. However this pool and any 2022 * ongoing processing are automatically terminated upon program 2023 * {@link System#exit}. Any program that relies on asynchronous 2024 * task processing to complete before program termination should 2025 * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence}, 2026 * before exit. 2027 * 2028 * @return the common pool instance 2029 */ 2030 static ForkJoinPool commonPool() { 2031 return common; 2032 } 2033 2034 // Execution methods 2035 2036 /** 2037 * Performs the given task, returning its result upon completion. 2038 * If the computation encounters an unchecked Exception or Error, 2039 * it is rethrown as the outcome of this invocation. Rethrown 2040 * exceptions behave in the same way as regular exceptions, but, 2041 * when possible, contain stack traces (as displayed for example 2042 * using {@code ex.printStackTrace()}) of both the current thread 2043 * as well as the thread actually encountering the exception; 2044 * minimally only the latter. 2045 * 2046 * @param task the task 2047 * @param (T) the type of the task's result 2048 * @return the task's result 2049 * @throws NullPointerException if the task is null 2050 * @throws RejectedExecutionException if the task cannot be 2051 * scheduled for execution 2052 */ 2053 T invoke(T)(ForkJoinTask!(T) task) { 2054 if (task is null) 2055 throw new NullPointerException(); 2056 externalSubmit!(T)(task); 2057 version(HUNT_CONCURRENCY_DEBUG) { 2058 infof("waiting the result..."); 2059 T c = task.join(); 2060 infof("final result: %s", c); 2061 return c; 2062 } else { 2063 return task.join(); 2064 } 2065 } 2066 2067 /** 2068 * Arranges for (asynchronous) execution of the given task. 2069 * 2070 * @param task the task 2071 * @throws NullPointerException if the task is null 2072 * @throws RejectedExecutionException if the task cannot be 2073 * scheduled for execution 2074 */ 2075 void execute(IForkJoinTask task) { 2076 externalSubmit(task); 2077 } 2078 2079 // AbstractExecutorService methods 2080 2081 /** 2082 * @throws NullPointerException if the task is null 2083 * @throws RejectedExecutionException if the task cannot be 2084 * scheduled for execution 2085 */ 2086 void execute(Runnable task) { 2087 if (task is null) 2088 throw new NullPointerException(); 2089 IForkJoinTask job = cast(IForkJoinTask) task; 2090 if (job is null) { // avoid re-wrap 2091 warning("job is null"); 2092 job = new RunnableExecuteAction(task); 2093 } 2094 externalSubmit(job); 2095 } 2096 2097 /** 2098 * Submits a ForkJoinTask for execution. 2099 * 2100 * @param task the task to submit 2101 * @param (T) the type of the task's result 2102 * @return the task 2103 * @throws NullPointerException if the task is null 2104 * @throws RejectedExecutionException if the task cannot be 2105 * scheduled for execution 2106 */ 2107 ForkJoinTask!(T) submitTask(T)(ForkJoinTask!(T) task) { 2108 return externalSubmit(task); 2109 } 2110 2111 /** 2112 * @throws NullPointerException if the task is null 2113 * @throws RejectedExecutionException if the task cannot be 2114 * scheduled for execution 2115 */ 2116 ForkJoinTask!(T) submitTask(T)(Callable!(T) task) { 2117 return externalSubmit(new AdaptedCallable!(T)(task)); 2118 } 2119 2120 /** 2121 * @throws NullPointerException if the task is null 2122 * @throws RejectedExecutionException if the task cannot be 2123 * scheduled for execution 2124 */ 2125 ForkJoinTask!(T) submitTask(T)(Runnable task, T result) { 2126 return externalSubmit(new AdaptedRunnable!(T)(task, result)); 2127 } 2128 2129 /** 2130 * @throws NullPointerException if the task is null 2131 * @throws RejectedExecutionException if the task cannot be 2132 * scheduled for execution 2133 */ 2134 2135 IForkJoinTask submitTask(Runnable task) { 2136 if (task is null) 2137 throw new NullPointerException(); 2138 IForkJoinTask t = cast(IForkJoinTask)task; 2139 return externalSubmit(t !is null 2140 ? cast(ForkJoinTask!(void)) task // avoid re-wrap 2141 : new AdaptedRunnableAction(task)); 2142 } 2143 2144 /** 2145 * @throws NullPointerException {@inheritDoc} 2146 * @throws RejectedExecutionException {@inheritDoc} 2147 */ 2148 List!(Future!(T)) invokeAll(T)(Collection!(Callable!(T)) tasks) { 2149 // In previous versions of this class, this method constructed 2150 // a task to run ForkJoinTask.invokeAll, but now external 2151 // invocation of multiple tasks is at least as efficient. 2152 ArrayList!(Future!(T)) futures = new ArrayList!(Future!(T))(tasks.size()); 2153 2154 try { 2155 foreach (Callable!(T) t ; tasks) { 2156 ForkJoinTask!(T) f = new AdaptedCallable!(T)(t); 2157 futures.add(f); 2158 externalSubmit(f); 2159 } 2160 for (int i = 0, size = futures.size(); i < size; i++) 2161 (cast(IForkJoinTask)(futures.get(i))).quietlyJoin(); 2162 return futures; 2163 } catch (Throwable t) { 2164 for (int i = 0, size = futures.size(); i < size; i++) 2165 futures.get(i).cancel(false); 2166 throw t; 2167 } 2168 } 2169 2170 /** 2171 * Returns the factory used for constructing new workers. 2172 * 2173 * @return the factory used for constructing new workers 2174 */ 2175 ForkJoinWorkerThreadFactory getFactory() { 2176 return factory; 2177 } 2178 2179 /** 2180 * Returns the handler for internal worker threads that terminate 2181 * due to unrecoverable errors encountered while executing tasks. 2182 * 2183 * @return the handler, or {@code null} if none 2184 */ 2185 UncaughtExceptionHandler getUncaughtExceptionHandler() { 2186 return ueh; 2187 } 2188 2189 /** 2190 * Returns the targeted parallelism level of this pool. 2191 * 2192 * @return the targeted parallelism level of this pool 2193 */ 2194 int getParallelism() { 2195 int par = mode & SMASK; 2196 return (par > 0) ? par : 1; 2197 } 2198 2199 /** 2200 * Returns the targeted parallelism level of the common pool. 2201 * 2202 * @return the targeted parallelism level of the common pool 2203 */ 2204 static int getCommonPoolParallelism() { 2205 return COMMON_PARALLELISM; 2206 } 2207 2208 /** 2209 * Returns the number of worker threads that have started but not 2210 * yet terminated. The result returned by this method may differ 2211 * from {@link #getParallelism} when threads are created to 2212 * maintain parallelism when others are cooperatively blocked. 2213 * 2214 * @return the number of worker threads 2215 */ 2216 int getPoolSize() { 2217 return ((mode & SMASK) + cast(short)(ctl >>> TC_SHIFT)); 2218 } 2219 2220 /** 2221 * Returns {@code true} if this pool uses local first-in-first-out 2222 * scheduling mode for forked tasks that are never joined. 2223 * 2224 * @return {@code true} if this pool uses async mode 2225 */ 2226 bool getAsyncMode() { 2227 return (mode & FIFO) != 0; 2228 } 2229 2230 /** 2231 * Returns an estimate of the number of worker threads that are 2232 * not blocked waiting to join tasks or for other managed 2233 * synchronization. This method may overestimate the 2234 * number of running threads. 2235 * 2236 * @return the number of worker threads 2237 */ 2238 int getRunningThreadCount() { 2239 WorkQueue[] ws; WorkQueue w; 2240 // VarHandle.acquireFence(); 2241 int rc = 0; 2242 if ((ws = workQueues) !is null) { 2243 for (int i = 1; i < cast(int)ws.length; i += 2) { 2244 if ((w = ws[i]) !is null && w.isApparentlyUnblocked()) 2245 ++rc; 2246 } 2247 } 2248 return rc; 2249 } 2250 2251 /** 2252 * Returns an estimate of the number of threads that are currently 2253 * stealing or executing tasks. This method may overestimate the 2254 * number of active threads. 2255 * 2256 * @return the number of active threads 2257 */ 2258 int getActiveThreadCount() { 2259 int r = (mode & SMASK) + cast(int)(ctl >> RC_SHIFT); 2260 return (r <= 0) ? 0 : r; // suppress momentarily negative values 2261 } 2262 2263 /** 2264 * Returns {@code true} if all worker threads are currently idle. 2265 * An idle worker is one that cannot obtain a task to execute 2266 * because none are available to steal from other threads, and 2267 * there are no pending submissions to the pool. This method is 2268 * conservative; it might not return {@code true} immediately upon 2269 * idleness of all threads, but will eventually become true if 2270 * threads remain inactive. 2271 * 2272 * @return {@code true} if all threads are currently idle 2273 */ 2274 bool isQuiescent() { 2275 for (;;) { 2276 long c = ctl; 2277 int md = mode, pc = md & SMASK; 2278 int tc = pc + cast(short)(c >>> TC_SHIFT); 2279 int rc = pc + cast(int)(c >> RC_SHIFT); 2280 if ((md & (STOP | TERMINATED)) != 0) 2281 return true; 2282 else if (rc > 0) 2283 return false; 2284 else { 2285 WorkQueue[] ws; WorkQueue v; 2286 if ((ws = workQueues) !is null) { 2287 for (int i = 1; i < ws.length; i += 2) { 2288 if ((v = ws[i]) !is null) { 2289 if (v.source > 0) 2290 return false; 2291 --tc; 2292 } 2293 } 2294 } 2295 if (tc == 0 && ctl == c) 2296 return true; 2297 } 2298 } 2299 } 2300 2301 /** 2302 * Returns an estimate of the total number of tasks stolen from 2303 * one thread's work queue by another. The reported value 2304 * underestimates the actual total number of steals when the pool 2305 * is not quiescent. This value may be useful for monitoring and 2306 * tuning fork/join programs: in general, steal counts should be 2307 * high enough to keep threads busy, but low enough to avoid 2308 * overhead and contention across threads. 2309 * 2310 * @return the number of steals 2311 */ 2312 long getStealCount() { 2313 long count = stealCount; 2314 WorkQueue[] ws; WorkQueue w; 2315 if ((ws = workQueues) !is null) { 2316 for (int i = 1; i < ws.length; i += 2) { 2317 if ((w = ws[i]) !is null) 2318 count += cast(long)w.nsteals & 0xffffffffL; 2319 } 2320 } 2321 return count; 2322 } 2323 2324 /** 2325 * Returns an estimate of the total number of tasks currently held 2326 * in queues by worker threads (but not including tasks submitted 2327 * to the pool that have not begun executing). This value is only 2328 * an approximation, obtained by iterating across all threads in 2329 * the pool. This method may be useful for tuning task 2330 * granularities. 2331 * 2332 * @return the number of queued tasks 2333 */ 2334 long getQueuedTaskCount() { 2335 WorkQueue[] ws; WorkQueue w; 2336 // VarHandle.acquireFence(); 2337 int count = 0; 2338 if ((ws = workQueues) !is null) { 2339 for (int i = 1; i < cast(int)ws.length; i += 2) { 2340 if ((w = ws[i]) !is null) 2341 count += w.queueSize(); 2342 } 2343 } 2344 return count; 2345 } 2346 2347 /** 2348 * Returns an estimate of the number of tasks submitted to this 2349 * pool that have not yet begun executing. This method may take 2350 * time proportional to the number of submissions. 2351 * 2352 * @return the number of queued submissions 2353 */ 2354 int getQueuedSubmissionCount() { 2355 WorkQueue[] ws; WorkQueue w; 2356 // VarHandle.acquireFence(); 2357 int count = 0; 2358 if ((ws = workQueues) !is null) { 2359 for (int i = 0; i < cast(int)ws.length; i += 2) { 2360 if ((w = ws[i]) !is null) 2361 count += w.queueSize(); 2362 } 2363 } 2364 return count; 2365 } 2366 2367 /** 2368 * Returns {@code true} if there are any tasks submitted to this 2369 * pool that have not yet begun executing. 2370 * 2371 * @return {@code true} if there are any queued submissions 2372 */ 2373 bool hasQueuedSubmissions() { 2374 WorkQueue[] ws; WorkQueue w; 2375 // VarHandle.acquireFence(); 2376 if ((ws = workQueues) !is null) { 2377 for (int i = 0; i < cast(int)ws.length; i += 2) { 2378 if ((w = ws[i]) !is null && !w.isEmpty()) 2379 return true; 2380 } 2381 } 2382 return false; 2383 } 2384 2385 /** 2386 * Removes and returns the next unexecuted submission if one is 2387 * available. This method may be useful in extensions to this 2388 * class that re-assign work in systems with multiple pools. 2389 * 2390 * @return the next submission, or {@code null} if none 2391 */ 2392 protected IForkJoinTask pollSubmission() { 2393 return pollScan(true); 2394 } 2395 2396 /** 2397 * Removes all available unexecuted submitted and forked tasks 2398 * from scheduling queues and adds them to the given collection, 2399 * without altering their execution status. These may include 2400 * artificially generated or wrapped tasks. This method is 2401 * designed to be invoked only when the pool is known to be 2402 * quiescent. Invocations at other times may not remove all 2403 * tasks. A failure encountered while attempting to add elements 2404 * to collection {@code c} may result in elements being in 2405 * neither, either or both collections when the associated 2406 * exception is thrown. The behavior of this operation is 2407 * undefined if the specified collection is modified while the 2408 * operation is in progress. 2409 * 2410 * @param c the collection to transfer elements into 2411 * @return the number of elements transferred 2412 */ 2413 protected int drainTasksTo(Collection!IForkJoinTask c) { 2414 WorkQueue[] ws; WorkQueue w; IForkJoinTask t; 2415 // VarHandle.acquireFence(); 2416 int count = 0; 2417 if ((ws = workQueues) !is null) { 2418 for (int i = 0; i < ws.length; ++i) { 2419 if ((w = ws[i]) !is null) { 2420 while ((t = w.poll()) !is null) { 2421 c.add(t); 2422 ++count; 2423 } 2424 } 2425 } 2426 } 2427 return count; 2428 } 2429 2430 /** 2431 * Returns a string identifying this pool, as well as its state, 2432 * including indications of run state, parallelism level, and 2433 * worker and task counts. 2434 * 2435 * @return a string identifying this pool, as well as its state 2436 */ 2437 override string toString() { 2438 // Use a single pass through workQueues to collect counts 2439 int md = mode; // read fields first 2440 long c = ctl; 2441 long st = stealCount; 2442 long qt = 0L, qs = 0L; int rc = 0; 2443 WorkQueue[] ws; WorkQueue w; 2444 if ((ws = workQueues) !is null) { 2445 for (int i = 0; i < ws.length; ++i) { 2446 if ((w = ws[i]) !is null) { 2447 int size = w.queueSize(); 2448 if ((i & 1) == 0) 2449 qs += size; 2450 else { 2451 qt += size; 2452 st += cast(long)w.nsteals & 0xffffffffL; 2453 if (w.isApparentlyUnblocked()) 2454 ++rc; 2455 } 2456 } 2457 } 2458 } 2459 2460 int pc = (md & SMASK); 2461 int tc = pc + cast(short)(c >>> TC_SHIFT); 2462 int ac = pc + cast(int)(c >> RC_SHIFT); 2463 if (ac < 0) // ignore negative 2464 ac = 0; 2465 string level = ((md & TERMINATED) != 0 ? "Terminated" : 2466 (md & STOP) != 0 ? "Terminating" : 2467 (md & SHUTDOWN) != 0 ? "Shutting down" : 2468 "Running"); 2469 return super.toString() ~ 2470 "[" ~ level ~ 2471 ", parallelism = " ~ pc.to!string() ~ 2472 ", size = " ~ tc.to!string() ~ 2473 ", active = " ~ ac.to!string() ~ 2474 ", running = " ~ rc.to!string() ~ 2475 ", steals = " ~ st.to!string() ~ 2476 ", tasks = " ~ qt.to!string() ~ 2477 ", submissions = " ~ qs.to!string() ~ 2478 "]"; 2479 } 2480 2481 /** 2482 * Possibly initiates an orderly shutdown in which previously 2483 * submitted tasks are executed, but no new tasks will be 2484 * accepted. Invocation has no effect on execution state if this 2485 * is the {@link #commonPool()}, and no additional effect if 2486 * already shut down. Tasks that are in the process of being 2487 * submitted concurrently during the course of this method may or 2488 * may not be rejected. 2489 * 2490 * @throws SecurityException if a security manager exists and 2491 * the caller is not permitted to modify threads 2492 * because it does not hold {@link 2493 * java.lang.RuntimePermission}{@code ("modifyThread")} 2494 */ 2495 void shutdown() { 2496 // checkPermission(); 2497 tryTerminate(false, true); 2498 } 2499 2500 /** 2501 * Possibly attempts to cancel and/or stop all tasks, and reject 2502 * all subsequently submitted tasks. Invocation has no effect on 2503 * execution state if this is the {@link #commonPool()}, and no 2504 * additional effect if already shut down. Otherwise, tasks that 2505 * are in the process of being submitted or executed concurrently 2506 * during the course of this method may or may not be 2507 * rejected. This method cancels both existing and unexecuted 2508 * tasks, in order to permit termination in the presence of task 2509 * dependencies. So the method always returns an empty list 2510 * (unlike the case for some other Executors). 2511 * 2512 * @return an empty list 2513 * @throws SecurityException if a security manager exists and 2514 * the caller is not permitted to modify threads 2515 * because it does not hold {@link 2516 * java.lang.RuntimePermission}{@code ("modifyThread")} 2517 */ 2518 List!(Runnable) shutdownNow() { 2519 // checkPermission(); 2520 tryTerminate(true, true); 2521 return Collections.emptyList!(Runnable)(); 2522 } 2523 2524 /** 2525 * Returns {@code true} if all tasks have completed following shut down. 2526 * 2527 * @return {@code true} if all tasks have completed following shut down 2528 */ 2529 bool isTerminated() { 2530 return (mode & TERMINATED) != 0; 2531 } 2532 2533 /** 2534 * Returns {@code true} if the process of termination has 2535 * commenced but not yet completed. This method may be useful for 2536 * debugging. A return of {@code true} reported a sufficient 2537 * period after shutdown may indicate that submitted tasks have 2538 * ignored or suppressed interruption, or are waiting for I/O, 2539 * causing this executor not to properly terminate. (See the 2540 * advisory notes for class {@link ForkJoinTask} stating that 2541 * tasks should not normally entail blocking operations. But if 2542 * they do, they must abort them on interrupt.) 2543 * 2544 * @return {@code true} if terminating but not yet terminated 2545 */ 2546 bool isTerminating() { 2547 int md = mode; 2548 return (md & STOP) != 0 && (md & TERMINATED) == 0; 2549 } 2550 2551 /** 2552 * Returns {@code true} if this pool has been shut down. 2553 * 2554 * @return {@code true} if this pool has been shut down 2555 */ 2556 bool isShutdown() { 2557 return (mode & SHUTDOWN) != 0; 2558 } 2559 2560 /** 2561 * Blocks until all tasks have completed execution after a 2562 * shutdown request, or the timeout occurs, or the current thread 2563 * is interrupted, whichever happens first. Because the {@link 2564 * #commonPool()} never terminates until program shutdown, when 2565 * applied to the common pool, this method is equivalent to {@link 2566 * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}. 2567 * 2568 * @param timeout the maximum time to wait 2569 * @param unit the time unit of the timeout argument 2570 * @return {@code true} if this executor terminated and 2571 * {@code false} if the timeout elapsed before termination 2572 * @throws InterruptedException if interrupted while waiting 2573 */ 2574 bool awaitTermination(Duration timeout) 2575 { 2576 if (ThreadEx.interrupted()) 2577 throw new InterruptedException(); 2578 if (this == common) { 2579 awaitQuiescence(timeout); 2580 return false; 2581 } 2582 long nanos = timeout.total!(TimeUnit.HectoNanosecond); 2583 if (isTerminated()) 2584 return true; 2585 if (nanos <= 0L) 2586 return false; 2587 long deadline = Clock.currStdTime + nanos; 2588 synchronized (this) { 2589 for (;;) { 2590 if (isTerminated()) 2591 return true; 2592 if (nanos <= 0L) 2593 return false; 2594 // long millis = TimeUnit.NANOSECONDS.toMillis(nanos); 2595 // wait(millis > 0L ? millis : 1L); 2596 // ThreadEx.currentThread().par 2597 ThreadEx.sleep(dur!(TimeUnit.HectoNanosecond)(nanos)); 2598 nanos = deadline - Clock.currStdTime; 2599 } 2600 } 2601 } 2602 2603 /** 2604 * If called by a ForkJoinTask operating in this pool, equivalent 2605 * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise, 2606 * waits and/or attempts to assist performing tasks until this 2607 * pool {@link #isQuiescent} or the indicated timeout elapses. 2608 * 2609 * @param timeout the maximum time to wait 2610 * @param unit the time unit of the timeout argument 2611 * @return {@code true} if quiescent; {@code false} if the 2612 * timeout elapsed. 2613 */ 2614 bool awaitQuiescence(Duration timeout) { 2615 long nanos = timeout.total!(TimeUnit.HectoNanosecond)(); 2616 Thread thread = Thread.getThis(); 2617 ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)thread; 2618 if (wt !is null && wt.pool is this) { 2619 helpQuiescePool(wt.workQueue); 2620 return true; 2621 } 2622 else { 2623 for (long startTime = Clock.currStdTime;;) { 2624 IForkJoinTask t; 2625 if ((t = pollScan(false)) !is null) 2626 t.doExec(); 2627 else if (isQuiescent()) 2628 return true; 2629 else if ((Clock.currStdTime - startTime) > nanos) 2630 return false; 2631 else 2632 Thread.yield(); // cannot block 2633 } 2634 } 2635 } 2636 2637 /** 2638 * Waits and/or attempts to assist performing tasks indefinitely 2639 * until the {@link #commonPool()} {@link #isQuiescent}. 2640 */ 2641 static void quiesceCommonPool() { 2642 common.awaitQuiescence(Duration.max); 2643 } 2644 2645 /** 2646 * Runs the given possibly blocking task. When {@linkplain 2647 * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this 2648 * method possibly arranges for a spare thread to be activated if 2649 * necessary to ensure sufficient parallelism while the current 2650 * thread is blocked in {@link ManagedBlocker#block blocker.block()}. 2651 * 2652 * <p>This method repeatedly calls {@code blocker.isReleasable()} and 2653 * {@code blocker.block()} until either method returns {@code true}. 2654 * Every call to {@code blocker.block()} is preceded by a call to 2655 * {@code blocker.isReleasable()} that returned {@code false}. 2656 * 2657 * <p>If not running in a ForkJoinPool, this method is 2658 * behaviorally equivalent to 2659 * <pre> {@code 2660 * while (!blocker.isReleasable()) 2661 * if (blocker.block()) 2662 * break;}</pre> 2663 * 2664 * If running in a ForkJoinPool, the pool may first be expanded to 2665 * ensure sufficient parallelism available during the call to 2666 * {@code blocker.block()}. 2667 * 2668 * @param blocker the blocker task 2669 * @throws InterruptedException if {@code blocker.block()} did so 2670 */ 2671 static void managedBlock(ManagedBlocker blocker) { 2672 if (blocker is null) throw new NullPointerException(); 2673 ForkJoinPool p; 2674 WorkQueue w; 2675 Thread t = Thread.getThis(); 2676 ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)t; 2677 if (wt !is null && (p = wt.pool) !is null && 2678 (w = wt.workQueue) !is null) { 2679 int block; 2680 while (!blocker.isReleasable()) { 2681 if ((block = p.tryCompensate(w)) != 0) { 2682 try { 2683 do {} while (!blocker.isReleasable() && 2684 !blocker.block()); 2685 } finally { 2686 AtomicHelper.getAndAdd(p.ctl, (block > 0) ? RC_UNIT : 0L); 2687 } 2688 break; 2689 } 2690 } 2691 } 2692 else { 2693 do {} while (!blocker.isReleasable() && 2694 !blocker.block()); 2695 } 2696 } 2697 2698 /** 2699 * If the given executor is a ForkJoinPool, poll and execute 2700 * AsynchronousCompletionTasks from worker's queue until none are 2701 * available or blocker is released. 2702 */ 2703 static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) { 2704 ForkJoinPool p = cast(ForkJoinPool)e; 2705 if (p !is null) { 2706 WorkQueue w; WorkQueue[] ws; int r, n; 2707 2708 Thread thread = Thread.getThis(); 2709 ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)thread; 2710 if (wt !is null && wt.pool is p) 2711 w = wt.workQueue; 2712 else if ((r = ThreadLocalRandom.getProbe()) != 0 && 2713 (ws = p.workQueues) !is null && (n = cast(int)ws.length) > 0) 2714 w = ws[(n - 1) & r & SQMASK]; 2715 else 2716 w = null; 2717 if (w !is null) 2718 w.helpAsyncBlocker(blocker); 2719 } 2720 } 2721 2722 // AbstractExecutorService overrides. These rely on undocumented 2723 // fact that ForkJoinTask.adapt returns ForkJoinTasks that also 2724 // implement RunnableFuture. 2725 2726 protected RunnableFuture!(T) newTaskFor(T)(Runnable runnable, T value) { 2727 return new AdaptedRunnable!(T)(runnable, value); 2728 } 2729 2730 protected RunnableFuture!(T) newTaskFor(T)(Callable!(T) callable) { 2731 return new AdaptedCallable!(T)(callable); 2732 } 2733 2734 shared static this() { 2735 int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES; 2736 try { 2737 ConfigBuilder config = Environment.getProperties(); 2738 string p = config.getProperty 2739 ("hunt.concurrency.ForkJoinPool.common.maximumSpares"); 2740 if (!p.empty()) 2741 commonMaxSpares = p.to!int(); 2742 } catch (Exception ignore) { 2743 version(HUNT_DEBUG) warning(ignore.toString()); 2744 } 2745 COMMON_MAX_SPARES = commonMaxSpares; 2746 2747 defaultForkJoinWorkerThreadFactory = 2748 new DefaultForkJoinWorkerThreadFactory(); 2749 common = new ForkJoinPool(cast(byte)0); 2750 COMMON_PARALLELISM = max(common.mode & SMASK, 1); 2751 } 2752 } 2753 2754 2755 /** 2756 * Factory for innocuous worker threads. 2757 */ 2758 private final class InnocuousForkJoinWorkerThreadFactory 2759 : ForkJoinWorkerThreadFactory { 2760 2761 /** 2762 * An ACC to restrict permissions for the factory itself. 2763 * The constructed workers have no permissions set. 2764 */ 2765 // private static final AccessControlContext ACC = contextWithPermissions( 2766 // modifyThreadPermission, 2767 // new RuntimePermission("enableContextClassLoaderOverride"), 2768 // new RuntimePermission("modifyThreadGroup"), 2769 // new RuntimePermission("getClassLoader"), 2770 // new RuntimePermission("setContextClassLoader")); 2771 2772 final ForkJoinWorkerThread newThread(ForkJoinPool pool) { 2773 return new InnocuousForkJoinWorkerThread(pool); 2774 // return AccessController.doPrivileged( 2775 // new PrivilegedAction<>() { 2776 // ForkJoinWorkerThread run() { 2777 // return new ForkJoinWorkerThread. 2778 // InnocuousForkJoinWorkerThread(pool); }}, 2779 // ACC); 2780 } 2781 } 2782 2783 2784 /** 2785 * Factory for creating new {@link ForkJoinWorkerThread}s. 2786 * A {@code ForkJoinWorkerThreadFactory} must be defined and used 2787 * for {@code ForkJoinWorkerThread} subclasses that extend base 2788 * functionality or initialize threads with different contexts. 2789 */ 2790 interface ForkJoinWorkerThreadFactory { 2791 /** 2792 * Returns a new worker thread operating in the given pool. 2793 * Returning null or throwing an exception may result in tasks 2794 * never being executed. If this method throws an exception, 2795 * it is relayed to the caller of the method (for example 2796 * {@code execute}) causing attempted thread creation. If this 2797 * method returns null or throws an exception, it is not 2798 * retried until the next attempted creation (for example 2799 * another call to {@code execute}). 2800 * 2801 * @param pool the pool this thread works in 2802 * @return the new worker thread, or {@code null} if the request 2803 * to create a thread is rejected 2804 * @throws NullPointerException if the pool is null 2805 */ 2806 ForkJoinWorkerThread newThread(ForkJoinPool pool); 2807 } 2808 2809 // Nested classes 2810 2811 // static AccessControlContext contextWithPermissions(Permission ... perms) { 2812 // Permissions permissions = new Permissions(); 2813 // for (Permission perm : perms) 2814 // permissions.add(perm); 2815 // return new AccessControlContext( 2816 // new ProtectionDomain[] { new ProtectionDomain(null, permissions) }); 2817 // } 2818 2819 /** 2820 * Default ForkJoinWorkerThreadFactory implementation; creates a 2821 * new ForkJoinWorkerThread using the system class loader as the 2822 * thread context class loader. 2823 */ 2824 private final class DefaultForkJoinWorkerThreadFactory : ForkJoinWorkerThreadFactory { 2825 // private static final AccessControlContext ACC = contextWithPermissions( 2826 // new RuntimePermission("getClassLoader"), 2827 // new RuntimePermission("setContextClassLoader")); 2828 2829 final ForkJoinWorkerThread newThread(ForkJoinPool pool) { 2830 return new ForkJoinWorkerThread(pool); 2831 // return AccessController.doPrivileged( 2832 // new PrivilegedAction<>() { 2833 // ForkJoinWorkerThread run() { 2834 // return new ForkJoinWorkerThread( 2835 // pool, ClassLoader.getSystemClassLoader()); }}, 2836 // ACC); 2837 } 2838 } 2839 2840 2841 /** 2842 * Interface for extending managed parallelism for tasks running 2843 * in {@link ForkJoinPool}s. 2844 * 2845 * <p>A {@code ManagedBlocker} provides two methods. Method 2846 * {@link #isReleasable} must return {@code true} if blocking is 2847 * not necessary. Method {@link #block} blocks the current thread 2848 * if necessary (perhaps internally invoking {@code isReleasable} 2849 * before actually blocking). These actions are performed by any 2850 * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}. 2851 * The unusual methods in this API accommodate synchronizers that 2852 * may, but don't usually, block for long periods. Similarly, they 2853 * allow more efficient internal handling of cases in which 2854 * additional workers may be, but usually are not, needed to 2855 * ensure sufficient parallelism. Toward this end, 2856 * implementations of method {@code isReleasable} must be amenable 2857 * to repeated invocation. 2858 * 2859 * <p>For example, here is a ManagedBlocker based on a 2860 * ReentrantLock: 2861 * <pre> {@code 2862 * class ManagedLocker implements ManagedBlocker { 2863 * final ReentrantLock lock; 2864 * bool hasLock = false; 2865 * ManagedLocker(ReentrantLock lock) { this.lock = lock; } 2866 * bool block() { 2867 * if (!hasLock) 2868 * lock.lock(); 2869 * return true; 2870 * } 2871 * bool isReleasable() { 2872 * return hasLock || (hasLock = lock.tryLock()); 2873 * } 2874 * }}</pre> 2875 * 2876 * <p>Here is a class that possibly blocks waiting for an 2877 * item on a given queue: 2878 * <pre> {@code 2879 * class QueueTaker!(E) : ManagedBlocker { 2880 * final BlockingQueue!(E) queue; 2881 * E item = null; 2882 * QueueTaker(BlockingQueue!(E) q) { this.queue = q; } 2883 * bool block() { 2884 * if (item is null) 2885 * item = queue.take(); 2886 * return true; 2887 * } 2888 * bool isReleasable() { 2889 * return item !is null || (item = queue.poll()) !is null; 2890 * } 2891 * E getItem() { // call after pool.managedBlock completes 2892 * return item; 2893 * } 2894 * }}</pre> 2895 */ 2896 static interface ManagedBlocker { 2897 /** 2898 * Possibly blocks the current thread, for example waiting for 2899 * a lock or condition. 2900 * 2901 * @return {@code true} if no additional blocking is necessary 2902 * (i.e., if isReleasable would return true) 2903 * @throws InterruptedException if interrupted while waiting 2904 * (the method is not required to do so, but is allowed to) 2905 */ 2906 bool block(); 2907 2908 /** 2909 * Returns {@code true} if blocking is unnecessary. 2910 * @return {@code true} if blocking is unnecessary 2911 */ 2912 bool isReleasable(); 2913 } 2914 2915 2916 2917 /** 2918 * A thread managed by a {@link ForkJoinPool}, which executes 2919 * {@link ForkJoinTask}s. 2920 * This class is subclassable solely for the sake of adding 2921 * functionality -- there are no overridable methods dealing with 2922 * scheduling or execution. However, you can override initialization 2923 * and termination methods surrounding the main task processing loop. 2924 * If you do create such a subclass, you will also need to supply a 2925 * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to 2926 * {@linkplain ForkJoinPool#ForkJoinPool(int, ForkJoinWorkerThreadFactory, 2927 * UncaughtExceptionHandler, bool, int, int, int, Predicate, long, TimeUnit) 2928 * use it} in a {@code ForkJoinPool}. 2929 * 2930 * @author Doug Lea 2931 */ 2932 class ForkJoinWorkerThread : ThreadEx { 2933 /* 2934 * ForkJoinWorkerThreads are managed by ForkJoinPools and perform 2935 * ForkJoinTasks. For explanation, see the internal documentation 2936 * of class ForkJoinPool. 2937 * 2938 * This class just maintains links to its pool and WorkQueue. The 2939 * pool field is set immediately upon construction, but the 2940 * workQueue field is not set until a call to registerWorker 2941 * completes. This leads to a visibility race, that is tolerated 2942 * by requiring that the workQueue field is only accessed by the 2943 * owning thread. 2944 * 2945 * Support for (non-public) subclass InnocuousForkJoinWorkerThread 2946 * requires that we break quite a lot of encapsulation (via helper 2947 * methods in ThreadLocalRandom) both here and in the subclass to 2948 * access and set Thread fields. 2949 */ 2950 2951 ForkJoinPool pool; // the pool this thread works in 2952 WorkQueue workQueue; // work-stealing mechanics 2953 2954 /** 2955 * Creates a ForkJoinWorkerThread operating in the given pool. 2956 * 2957 * @param pool the pool this thread works in 2958 * @throws NullPointerException if pool is null 2959 */ 2960 protected this(ForkJoinPool pool) { 2961 // Use a placeholder until a useful name can be set in registerWorker 2962 super("aForkJoinWorkerThread"); 2963 this.pool = pool; 2964 this.workQueue = pool.registerWorker(this); 2965 } 2966 2967 /** 2968 * Version for use by the default pool. Supports setting the 2969 * context class loader. This is a separate constructor to avoid 2970 * affecting the protected constructor. 2971 */ 2972 // this(ForkJoinPool pool, ClassLoader ccl) { 2973 // super("aForkJoinWorkerThread"); 2974 // super.setContextClassLoader(ccl); 2975 // this.pool = pool; 2976 // this.workQueue = pool.registerWorker(this); 2977 // } 2978 2979 /** 2980 * Version for InnocuousForkJoinWorkerThread. 2981 */ 2982 this(ForkJoinPool pool, 2983 // ClassLoader ccl, 2984 ThreadGroupEx threadGroup, 2985 ) { // AccessControlContext acc 2986 super(threadGroup, null, "aForkJoinWorkerThread"); 2987 // super.setContextClassLoader(ccl); 2988 // ThreadLocalRandom.setInheritedAccessControlContext(this, acc); 2989 // ThreadLocalRandom.eraseThreadLocals(this); // clear before registering 2990 this.pool = pool; 2991 this.workQueue = pool.registerWorker(this); 2992 } 2993 2994 /** 2995 * Returns the pool hosting this thread. 2996 * 2997 * @return the pool 2998 */ 2999 ForkJoinPool getPool() { 3000 return pool; 3001 } 3002 3003 /** 3004 * Returns the unique index number of this thread in its pool. 3005 * The returned value ranges from zero to the maximum number of 3006 * threads (minus one) that may exist in the pool, and does not 3007 * change during the lifetime of the thread. This method may be 3008 * useful for applications that track status or collect results 3009 * per-worker-thread rather than per-task. 3010 * 3011 * @return the index number 3012 */ 3013 int getPoolIndex() { 3014 return workQueue.getPoolIndex(); 3015 } 3016 3017 /** 3018 * Initializes internal state after construction but before 3019 * processing any tasks. If you override this method, you must 3020 * invoke {@code super.onStart()} at the beginning of the method. 3021 * Initialization requires care: Most fields must have legal 3022 * default values, to ensure that attempted accesses from other 3023 * threads work correctly even before this thread starts 3024 * processing tasks. 3025 */ 3026 protected void onStart() { 3027 } 3028 3029 /** 3030 * Performs cleanup associated with termination of this worker 3031 * thread. If you override this method, you must invoke 3032 * {@code super.onTermination} at the end of the overridden method. 3033 * 3034 * @param exception the exception causing this thread to abort due 3035 * to an unrecoverable error, or {@code null} if completed normally 3036 */ 3037 protected void onTermination(Throwable exception) { 3038 } 3039 3040 /** 3041 * This method is required to be public, but should never be 3042 * called explicitly. It performs the main run loop to execute 3043 * {@link ForkJoinTask}s. 3044 */ 3045 override void run() { 3046 if (workQueue.array !is null) 3047 return; 3048 // only run once 3049 Throwable exception = null; 3050 3051 try { 3052 onStart(); 3053 pool.runWorker(workQueue); 3054 } catch (Throwable ex) { 3055 version(HUNT_DEBUG) { 3056 warning(ex); 3057 } else { 3058 warning(ex.msg); 3059 } 3060 exception = ex; 3061 } 3062 3063 try { 3064 onTermination(exception); 3065 } catch(Throwable ex) { 3066 version(HUNT_DEBUG) { 3067 warning(ex); 3068 } else { 3069 warning(ex.msg); 3070 } 3071 if (exception is null) 3072 exception = ex; 3073 } 3074 pool.deregisterWorker(this, exception); 3075 } 3076 3077 /** 3078 * Non-hook method for InnocuousForkJoinWorkerThread. 3079 */ 3080 void afterTopLevelExec() { 3081 } 3082 3083 3084 int awaitJoin(IForkJoinTask task) { 3085 int s = task.getStatus(); 3086 WorkQueue w = workQueue; 3087 if(w.tryUnpush(task) && (s = task.doExec()) < 0 ) 3088 return s; 3089 else 3090 return pool.awaitJoin(w, task, MonoTime.zero()); 3091 } 3092 3093 /** 3094 * If the current thread is operating in a ForkJoinPool, 3095 * unschedules and returns, without executing, a task externally 3096 * submitted to the pool, if one is available. Availability may be 3097 * transient, so a {@code null} result does not necessarily imply 3098 * quiescence of the pool. This method is designed primarily to 3099 * support extensions, and is unlikely to be useful otherwise. 3100 * 3101 * @return a task, or {@code null} if none are available 3102 */ 3103 protected static IForkJoinTask pollSubmission() { 3104 ForkJoinWorkerThread t = cast(ForkJoinWorkerThread)Thread.getThis(); 3105 return t !is null ? t.pool.pollSubmission() : null; 3106 } 3107 } 3108 3109 3110 /** 3111 * A worker thread that has no permissions, is not a member of any 3112 * user-defined ThreadGroupEx, uses the system class loader as 3113 * thread context class loader, and erases all ThreadLocals after 3114 * running each top-level task. 3115 */ 3116 final class InnocuousForkJoinWorkerThread : ForkJoinWorkerThread { 3117 /** The ThreadGroupEx for all InnocuousForkJoinWorkerThreads */ 3118 private __gshared ThreadGroupEx innocuousThreadGroup; 3119 // AccessController.doPrivileged(new PrivilegedAction<>() { 3120 // ThreadGroupEx run() { 3121 // ThreadGroupEx group = Thread.getThis().getThreadGroup(); 3122 // for (ThreadGroupEx p; (p = group.getParent()) !is null; ) 3123 // group = p; 3124 // return new ThreadGroupEx( 3125 // group, "InnocuousForkJoinWorkerThreadGroup"); 3126 // }}); 3127 3128 /** An AccessControlContext supporting no privileges */ 3129 // private static final AccessControlContext INNOCUOUS_ACC = 3130 // new AccessControlContext( 3131 // new ProtectionDomain[] { new ProtectionDomain(null, null) }); 3132 3133 shared static this() { 3134 // ThreadGroupEx group = Thread.getThis().getThreadGroup(); 3135 // for (ThreadGroupEx p; (p = group.getParent()) !is null; ) 3136 // group = p; 3137 innocuousThreadGroup = new ThreadGroupEx( 3138 null, "InnocuousForkJoinWorkerThreadGroup"); 3139 } 3140 3141 this(ForkJoinPool pool) { 3142 super(pool, 3143 // ClassLoader.getSystemClassLoader(), 3144 innocuousThreadGroup, 3145 // INNOCUOUS_ACC 3146 ); 3147 } 3148 3149 override // to erase ThreadLocals 3150 void afterTopLevelExec() { 3151 // ThreadLocalRandom.eraseThreadLocals(this); 3152 } 3153 3154 override // to silently fail 3155 void setUncaughtExceptionHandler(UncaughtExceptionHandler x) { } 3156 3157 // override // paranoically 3158 // void setContextClassLoader(ClassLoader cl) { 3159 // throw new SecurityException("setContextClassLoader"); 3160 // } 3161 } 3162 3163 3164 /** 3165 * Queues supporting work-stealing as well as external task 3166 * submission. See above for descriptions and algorithms. 3167 */ 3168 final class WorkQueue { 3169 int source; // source queue id, or sentinel 3170 int id; // pool index, mode, tag 3171 shared int base; // index of next slot for poll 3172 shared int top; // index of next slot for push 3173 shared int phase; // versioned, negative: queued, 1: locked 3174 int stackPred; // pool stack (ctl) predecessor link 3175 int nsteals; // number of steals 3176 IForkJoinTask[] array; // the queued tasks; power of 2 size 3177 ForkJoinPool pool; // the containing pool (may be null) 3178 ForkJoinWorkerThread owner; // owning thread or null if shared 3179 3180 this(ForkJoinPool pool, ForkJoinWorkerThread owner) { 3181 this.pool = pool; 3182 this.owner = owner; 3183 // Place indices in the center of array (that is not yet allocated) 3184 base = top = INITIAL_QUEUE_CAPACITY >>> 1; 3185 } 3186 3187 /** 3188 * Tries to lock shared queue by CASing phase field. 3189 */ 3190 final bool tryLockPhase() { 3191 return cas(&this.phase, 0, 1); 3192 // return PHASE.compareAndSet(this, 0, 1); 3193 } 3194 3195 final void releasePhaseLock() { 3196 // PHASE.setRelease(this, 0); 3197 atomicStore(this.phase, 0); 3198 } 3199 3200 /** 3201 * Returns an exportable index (used by ForkJoinWorkerThread). 3202 */ 3203 final int getPoolIndex() { 3204 return (id & 0xffff) >>> 1; // ignore odd/even tag bit 3205 } 3206 3207 /** 3208 * Returns the approximate number of tasks in the queue. 3209 */ 3210 final int queueSize() { 3211 // int n = cast(int)BASE.getAcquire(this) - top; 3212 int n = atomicLoad(this.base) - top; 3213 return (n >= 0) ? 0 : -n; // ignore negative 3214 } 3215 3216 /** 3217 * Provides a more accurate estimate of whether this queue has 3218 * any tasks than does queueSize, by checking whether a 3219 * near-empty queue has at least one unclaimed task. 3220 */ 3221 final bool isEmpty() { 3222 IForkJoinTask[] a; int n, cap, b; 3223 // VarHandle.acquireFence(); // needed by external callers 3224 return ((n = (b = base) - top) >= 0 || // possibly one task 3225 (n == -1 && ((a = array) is null || 3226 (cap = cast(int)a.length) == 0 || 3227 a[(cap - 1) & b] is null))); 3228 } 3229 3230 /** 3231 * Pushes a task. Call only by owner in unshared queues. 3232 * 3233 * @param task the task. Caller must ensure non-null. 3234 * @throws RejectedExecutionException if array cannot be resized 3235 */ 3236 final void push(IForkJoinTask task) { 3237 IForkJoinTask[] a; 3238 int s = top, d, cap, m; 3239 ForkJoinPool p = pool; 3240 if ((a = array) !is null && (cap = cast(int)a.length) > 0) { 3241 m = cap - 1; 3242 // FIXME: Needing refactor or cleanup -@zxp at 2019/2/9 8:40:55 3243 // 3244 // AtomicHelper.store(a[m & s], task); 3245 a[m & s] = task; 3246 // QA.setRelease(a, (m = cap - 1) & s, task); 3247 top = s + 1; 3248 if (((d = s - atomicLoad(this.base)) & ~1) == 0 && 3249 p !is null) { // size 0 or 1 3250 // VarHandle.fullFence(); 3251 p.signalWork(); 3252 } 3253 else if (d == m) 3254 growArray(false); 3255 } 3256 } 3257 3258 /** 3259 * Version of push for shared queues. Call only with phase lock held. 3260 * @return true if should signal work 3261 */ 3262 final bool lockedPush(IForkJoinTask task) { 3263 IForkJoinTask[] a; 3264 bool signal = false; 3265 int s = top, b = base, cap, d; 3266 if ((a = array) !is null && (cap = cast(int)a.length) > 0) { 3267 a[(cap - 1) & s] = task; 3268 top = s + 1; 3269 if (b - s + cap - 1 == 0) 3270 growArray(true); 3271 else { 3272 phase = 0; // full unlock 3273 if (((s - base) & ~1) == 0) // size 0 or 1 3274 signal = true; 3275 } 3276 } 3277 return signal; 3278 } 3279 3280 /** 3281 * Doubles the capacity of array. Call either by owner or with 3282 * lock held -- it is OK for base, but not top, to move while 3283 * resizings are in progress. 3284 */ 3285 final void growArray(bool locked) { 3286 IForkJoinTask[] newA = null; 3287 try { 3288 IForkJoinTask[] oldA; int oldSize, newSize; 3289 if ((oldA = array) !is null && (oldSize = cast(int)oldA.length) > 0 && 3290 (newSize = oldSize << 1) <= MAXIMUM_QUEUE_CAPACITY && 3291 newSize > 0) { 3292 try { 3293 newA = new IForkJoinTask[newSize]; 3294 } catch (OutOfMemoryError ex) { 3295 } 3296 if (newA !is null) { // poll from old array, push to new 3297 int oldMask = oldSize - 1, newMask = newSize - 1; 3298 for (int s = top - 1, k = oldMask; k >= 0; --k) { 3299 // IForkJoinTask x = AtomicHelper.getAndSet(oldA[s & oldMask], null); 3300 // FIXME: Needing refactor or cleanup -@zxp at 2019/2/9 下午8:57:26 3301 // 3302 IForkJoinTask x = oldA[s & oldMask]; 3303 oldA[s & oldMask] = null; 3304 3305 if (x !is null) 3306 newA[s-- & newMask] = x; 3307 else 3308 break; 3309 } 3310 array = newA; 3311 // VarHandle.releaseFence(); 3312 } 3313 } 3314 } finally { 3315 if (locked) 3316 phase = 0; 3317 } 3318 if (newA is null) 3319 throw new RejectedExecutionException("Queue capacity exceeded"); 3320 } 3321 3322 /** 3323 * Takes next task, if one exists, in FIFO order. 3324 */ 3325 final IForkJoinTask poll() { 3326 int b, k, cap; 3327 IForkJoinTask[] a; 3328 while ((a = array) !is null && (cap = cast(int)a.length) > 0 && 3329 top - (b = base) > 0) { 3330 k = (cap - 1) & b; 3331 // FIXME: Needing refactor or cleanup -@zxp at 2019/2/9 8:42:05 3332 // 3333 3334 // IForkJoinTask t = AtomicHelper.load(a[k]); 3335 IForkJoinTask t = a[k]; 3336 if (base == b++) { 3337 if (t is null) 3338 Thread.yield(); // await index advance 3339 else if (AtomicHelper.compareAndSet(a[k], t, null)) { 3340 // else if (QA.compareAndSet(a, k, t, null)) { 3341 AtomicHelper.store(this.base, b); 3342 return t; 3343 } 3344 } 3345 } 3346 return null; 3347 } 3348 3349 /** 3350 * Takes next task, if one exists, in order specified by mode. 3351 */ 3352 final IForkJoinTask nextLocalTask() { 3353 IForkJoinTask t = null; 3354 int md = id, b, s, d, cap; IForkJoinTask[] a; 3355 if ((a = array) !is null && (cap = cast(int)a.length) > 0 && 3356 (d = (s = top) - (b = base)) > 0) { 3357 if ((md & FIFO) == 0 || d == 1) { 3358 auto index = (cap - 1) & --s; 3359 t = AtomicHelper.getAndSet(a[index], cast(IForkJoinTask)null); 3360 if(t !is null) { 3361 AtomicHelper.store(this.top, s); 3362 } 3363 } else { 3364 auto index = (cap - 1) & b++; 3365 t = AtomicHelper.getAndSet(a[index], cast(IForkJoinTask)null); 3366 if (t !is null) { 3367 AtomicHelper.store(this.base, b); 3368 } 3369 else // on contention in FIFO mode, use regular poll 3370 t = poll(); 3371 } 3372 } 3373 return t; 3374 } 3375 3376 /** 3377 * Returns next task, if one exists, in order specified by mode. 3378 */ 3379 final IForkJoinTask peek() { 3380 int cap; IForkJoinTask[] a; 3381 return ((a = array) !is null && (cap = cast(int)a.length) > 0) ? 3382 a[(cap - 1) & ((id & FIFO) != 0 ? base : top - 1)] : null; 3383 } 3384 3385 /** 3386 * Pops the given task only if it is at the current top. 3387 */ 3388 final bool tryUnpush(IForkJoinTask task) { 3389 bool popped = false; 3390 int s, cap; IForkJoinTask[] a; 3391 if ((a = array) !is null && (cap = cast(int)a.length) > 0 && 3392 (s = top) != base) { 3393 popped = AtomicHelper.compareAndSet(a[(cap - 1) & --s], task, null); 3394 if(popped) { 3395 AtomicHelper.store(this.top, s); 3396 } 3397 } 3398 return popped; 3399 } 3400 3401 /** 3402 * Shared version of tryUnpush. 3403 */ 3404 final bool tryLockedUnpush(IForkJoinTask task) { 3405 bool popped = false; 3406 int s = top - 1, k, cap; IForkJoinTask[] a; 3407 if ((a = array) !is null && (cap = cast(int)a.length) > 0 && 3408 a[k = (cap - 1) & s] == task && tryLockPhase()) { 3409 if (top == s + 1 && array == a) { 3410 popped = AtomicHelper.compareAndSet(a[k], task, null); 3411 if(popped) top = s; 3412 } 3413 releasePhaseLock(); 3414 } 3415 return popped; 3416 } 3417 3418 /** 3419 * Removes and cancels all known tasks, ignoring any exceptions. 3420 */ 3421 final void cancelAll() { 3422 for (IForkJoinTask t; (t = poll()) !is null; ) 3423 IForkJoinTask.cancelIgnoringExceptions(t); 3424 } 3425 3426 // Specialized execution methods 3427 3428 /** 3429 * Runs the given (stolen) task if nonnull, as well as 3430 * remaining local tasks and others available from the given 3431 * queue, up to bound n (to avoid infinite unfairness). 3432 */ 3433 final void topLevelExec(IForkJoinTask t, WorkQueue q, int n) { 3434 if (t !is null && q !is null) { // hoist checks 3435 int nstolen = 1; 3436 for (;;) { 3437 t.doExec(); 3438 if (n-- < 0) 3439 break; 3440 else if ((t = nextLocalTask()) is null) { 3441 if ((t = q.poll()) is null) 3442 break; 3443 else 3444 ++nstolen; 3445 } 3446 } 3447 ForkJoinWorkerThread thread = owner; 3448 nsteals += nstolen; 3449 source = 0; 3450 if (thread !is null) 3451 thread.afterTopLevelExec(); 3452 } 3453 } 3454 3455 /** 3456 * If present, removes task from queue and executes it. 3457 */ 3458 final void tryRemoveAndExec(IForkJoinTask task) { 3459 IForkJoinTask[] a; int s, cap; 3460 if ((a = array) !is null && (cap = cast(int)a.length) > 0 && 3461 (s = top) - base > 0) { // traverse from top 3462 for (int m = cap - 1, ns = s - 1, i = ns; ; --i) { 3463 int index = i & m; 3464 IForkJoinTask t = a[index]; //QA.get(a, index); 3465 if (t is null) 3466 break; 3467 else if (t == task) { 3468 // if (AtomicHelper.compareAndSet(a[index], t, null)) 3469 if(a[index] == t) { 3470 a[index] = null; 3471 top = ns; // safely shift down 3472 for (int j = i; j != ns; ++j) { 3473 IForkJoinTask f; 3474 int pindex = (j + 1) & m; 3475 f = a[pindex];// QA.get(a, pindex); 3476 a[pindex] = null; 3477 // AtomicHelper.store(a[pindex], null); 3478 // QA.setVolatile(a, pindex, null); 3479 int jindex = j & m; 3480 a[jindex] = f; 3481 // AtomicHelper.store(a[jindex], f); 3482 // QA.setRelease(a, jindex, f); 3483 } 3484 // VarHandle.releaseFence(); 3485 t.doExec(); 3486 } 3487 break; 3488 } 3489 } 3490 } 3491 } 3492 3493 /** 3494 * Tries to pop and run tasks within the target's computation 3495 * until done, not found, or limit exceeded. 3496 * 3497 * @param task root of CountedCompleter computation 3498 * @param limit max runs, or zero for no limit 3499 * @param shared true if must lock to extract task 3500 * @return task status on exit 3501 */ 3502 final int helpCC(ICountedCompleter task, int limit, bool isShared) { 3503 int status = 0; 3504 if (task !is null && (status = task.getStatus()) >= 0) { 3505 int s, k, cap; IForkJoinTask[] a; 3506 while ((a = array) !is null && (cap = cast(int)a.length) > 0 && 3507 (s = top) - base > 0) { 3508 ICountedCompleter v = null; 3509 IForkJoinTask o = a[k = (cap - 1) & (s - 1)]; 3510 ICountedCompleter t = cast(ICountedCompleter)o; 3511 if (t !is null) { 3512 for (ICountedCompleter f = t;;) { 3513 if (f != task) { 3514 if ((f = f.getCompleter()) is null) 3515 break; 3516 } 3517 else if (isShared) { 3518 if (tryLockPhase()) { 3519 if (top == s && array == a && 3520 AtomicHelper.compareAndSet(a[k], t, cast(IForkJoinTask)null)) { 3521 top = s - 1; 3522 v = t; 3523 } 3524 releasePhaseLock(); 3525 } 3526 break; 3527 } 3528 else { 3529 if (AtomicHelper.compareAndSet(a[k], t, cast(IForkJoinTask)null)) { 3530 top = s - 1; 3531 v = t; 3532 } 3533 break; 3534 } 3535 } 3536 } 3537 if (v !is null) 3538 v.doExec(); 3539 if ((status = task.getStatus()) < 0 || v is null || 3540 (limit != 0 && --limit == 0)) 3541 break; 3542 } 3543 } 3544 return status; 3545 } 3546 3547 /** 3548 * Tries to poll and run AsynchronousCompletionTasks until 3549 * none found or blocker is released 3550 * 3551 * @param blocker the blocker 3552 */ 3553 final void helpAsyncBlocker(ManagedBlocker blocker) { 3554 if (blocker !is null) { 3555 int b, k, cap; IForkJoinTask[] a; IForkJoinTask t; 3556 while ((a = array) !is null && (cap = cast(int)a.length) > 0 && 3557 top - (b = base) > 0) { 3558 k = (cap - 1) & b; 3559 // t = AtomicHelper.load(a[k]); 3560 t = a[k]; 3561 if (blocker.isReleasable()) 3562 break; 3563 else if (base == b++ && t !is null) { 3564 AsynchronousCompletionTask at = cast(AsynchronousCompletionTask)t; 3565 if (at is null) 3566 break; 3567 else if (AtomicHelper.compareAndSet(a[k], t, null)) { 3568 AtomicHelper.store(this.base, b); 3569 t.doExec(); 3570 } 3571 } 3572 } 3573 } 3574 } 3575 3576 /** 3577 * Returns true if owned and not known to be blocked. 3578 */ 3579 final bool isApparentlyUnblocked() { 3580 ThreadEx wt; ThreadState s; 3581 return ((wt = owner) !is null && 3582 (s = wt.getState()) != ThreadState.BLOCKED && 3583 s != ThreadState.WAITING && 3584 s != ThreadState.TIMED_WAITING); 3585 } 3586 3587 // VarHandle mechanics. 3588 // static final VarHandle PHASE; 3589 // static final VarHandle BASE; 3590 // static final VarHandle TOP; 3591 // static { 3592 // try { 3593 // MethodHandles.Lookup l = MethodHandles.lookup(); 3594 // PHASE = l.findVarHandle(WorkQueue.class, "phase", int.class); 3595 // BASE = l.findVarHandle(WorkQueue.class, "base", int.class); 3596 // TOP = l.findVarHandle(WorkQueue.class, "top", int.class); 3597 // } catch (ReflectiveOperationException e) { 3598 // throw new ExceptionInInitializerError(e); 3599 // } 3600 // } 3601 } 3602 3603 3604 3605 /** 3606 * A marker interface identifying asynchronous tasks produced by 3607 * {@code async} methods. This may be useful for monitoring, 3608 * debugging, and tracking asynchronous activities. 3609 * 3610 */ 3611 interface AsynchronousCompletionTask { 3612 }