1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 module hunt.concurrency.ForkJoinTask; 37 38 import hunt.concurrency.atomic; 39 import hunt.concurrency.Future; 40 import hunt.concurrency.thread; 41 42 import hunt.concurrency.ForkJoinPool; 43 import hunt.concurrency.ForkJoinTaskHelper; 44 45 import hunt.collection.Collection; 46 import hunt.logging.ConsoleLogger; 47 import hunt.Exceptions; 48 import hunt.util.Common; 49 import hunt.util.DateTime; 50 import hunt.util.Runnable; 51 52 import core.time; 53 import core.sync.condition; 54 import core.sync.mutex; 55 import core.thread; 56 57 58 /** 59 * Abstract base class for tasks that run within a {@link ForkJoinPool}. 60 * A {@code ForkJoinTask} is a thread-like entity that is much 61 * lighter weight than a normal thread. Huge numbers of tasks and 62 * subtasks may be hosted by a small number of actual threads in a 63 * ForkJoinPool, at the price of some usage limitations. 64 * 65 * <p>A "main" {@code ForkJoinTask} begins execution when it is 66 * explicitly submitted to a {@link ForkJoinPool}, or, if not already 67 * engaged in a ForkJoin computation, commenced in the {@link 68 * ForkJoinPool#commonPool()} via {@link #fork}, {@link #invoke}, or 69 * related methods. Once started, it will usually in turn start other 70 * subtasks. As indicated by the name of this class, many programs 71 * using {@code ForkJoinTask} employ only methods {@link #fork} and 72 * {@link #join}, or derivatives such as {@link 73 * #invokeAll(ForkJoinTask...) invokeAll}. However, this class also 74 * provides a number of other methods that can come into play in 75 * advanced usages, as well as extension mechanics that allow support 76 * of new forms of fork/join processing. 77 * 78 * <p>A {@code ForkJoinTask} is a lightweight form of {@link Future}. 79 * The efficiency of {@code ForkJoinTask}s stems from a set of 80 * restrictions (that are only partially statically enforceable) 81 * reflecting their main use as computational tasks calculating pure 82 * functions or operating on purely isolated objects. The primary 83 * coordination mechanisms are {@link #fork}, that arranges 84 * asynchronous execution, and {@link #join}, that doesn't proceed 85 * until the task's result has been computed. Computations should 86 * ideally avoid {@code synchronized} methods or blocks, and should 87 * minimize other blocking synchronization apart from joining other 88 * tasks or using synchronizers such as Phasers that are advertised to 89 * cooperate with fork/join scheduling. Subdividable tasks should also 90 * not perform blocking I/O, and should ideally access variables that 91 * are completely independent of those accessed by other running 92 * tasks. These guidelines are loosely enforced by not permitting 93 * checked exceptions such as {@code IOExceptions} to be 94 * thrown. However, computations may still encounter unchecked 95 * exceptions, that are rethrown to callers attempting to join 96 * them. These exceptions may additionally include {@link 97 * RejectedExecutionException} stemming from internal resource 98 * exhaustion, such as failure to allocate internal task 99 * queues. Rethrown exceptions behave in the same way as regular 100 * exceptions, but, when possible, contain stack traces (as displayed 101 * for example using {@code ex.printStackTrace()}) of both the thread 102 * that initiated the computation as well as the thread actually 103 * encountering the exception; minimally only the latter. 104 * 105 * <p>It is possible to define and use ForkJoinTasks that may block, 106 * but doing so requires three further considerations: (1) Completion 107 * of few if any <em>other</em> tasks should be dependent on a task 108 * that blocks on external synchronization or I/O. Event-style async 109 * tasks that are never joined (for example, those subclassing {@link 110 * CountedCompleter}) often fall into this category. (2) To minimize 111 * resource impact, tasks should be small; ideally performing only the 112 * (possibly) blocking action. (3) Unless the {@link 113 * ForkJoinPool.ManagedBlocker} API is used, or the number of possibly 114 * blocked tasks is known to be less than the pool's {@link 115 * ForkJoinPool#getParallelism} level, the pool cannot guarantee that 116 * enough threads will be available to ensure progress or good 117 * performance. 118 * 119 * <p>The primary method for awaiting completion and extracting 120 * results of a task is {@link #join}, but there are several variants: 121 * The {@link Future#get} methods support interruptible and/or timed 122 * waits for completion and report results using {@code Future} 123 * conventions. Method {@link #invoke} is semantically 124 * equivalent to {@code fork(); join()} but always attempts to begin 125 * execution in the current thread. The "<em>quiet</em>" forms of 126 * these methods do not extract results or report exceptions. These 127 * may be useful when a set of tasks are being executed, and you need 128 * to delay processing of results or exceptions until all complete. 129 * Method {@code invokeAll} (available in multiple versions) 130 * performs the most common form of parallel invocation: forking a set 131 * of tasks and joining them all. 132 * 133 * <p>In the most typical usages, a fork-join pair act like a call 134 * (fork) and return (join) from a parallel recursive function. As is 135 * the case with other forms of recursive calls, returns (joins) 136 * should be performed innermost-first. For example, {@code a.fork(); 137 * b.fork(); b.join(); a.join();} is likely to be substantially more 138 * efficient than joining {@code a} before {@code b}. 139 * 140 * <p>The execution status of tasks may be queried at several levels 141 * of detail: {@link #isDone} is true if a task completed in any way 142 * (including the case where a task was cancelled without executing); 143 * {@link #isCompletedNormally} is true if a task completed without 144 * cancellation or encountering an exception; {@link #isCancelled} is 145 * true if the task was cancelled (in which case {@link #getException} 146 * returns a {@link CancellationException}); and 147 * {@link #isCompletedAbnormally} is true if a task was either 148 * cancelled or encountered an exception, in which case {@link 149 * #getException} will return either the encountered exception or 150 * {@link CancellationException}. 151 * 152 * <p>The ForkJoinTask class is not usually directly subclassed. 153 * Instead, you subclass one of the abstract classes that support a 154 * particular style of fork/join processing, typically {@link 155 * RecursiveAction} for most computations that do not return results, 156 * {@link RecursiveTask} for those that do, and {@link 157 * CountedCompleter} for those in which completed actions trigger 158 * other actions. Normally, a concrete ForkJoinTask subclass declares 159 * fields comprising its parameters, established in a constructor, and 160 * then defines a {@code compute} method that somehow uses the control 161 * methods supplied by this base class. 162 * 163 * <p>Method {@link #join} and its variants are appropriate for use 164 * only when completion dependencies are acyclic; that is, the 165 * parallel computation can be described as a directed acyclic graph 166 * (DAG). Otherwise, executions may encounter a form of deadlock as 167 * tasks cyclically wait for each other. However, this framework 168 * supports other methods and techniques (for example the use of 169 * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that 170 * may be of use in constructing custom subclasses for problems that 171 * are not statically structured as DAGs. To support such usages, a 172 * ForkJoinTask may be atomically <em>tagged</em> with a {@code short} 173 * value using {@link #setForkJoinTaskTag} or {@link 174 * #compareAndSetForkJoinTaskTag} and checked using {@link 175 * #getForkJoinTaskTag}. The ForkJoinTask implementation does not use 176 * these {@code protected} methods or tags for any purpose, but they 177 * may be of use in the construction of specialized subclasses. For 178 * example, parallel graph traversals can use the supplied methods to 179 * avoid revisiting nodes/tasks that have already been processed. 180 * (Method names for tagging are bulky in part to encourage definition 181 * of methods that reflect their usage patterns.) 182 * 183 * <p>Most base support methods are {@code final}, to prevent 184 * overriding of implementations that are intrinsically tied to the 185 * underlying lightweight task scheduling framework. Developers 186 * creating new basic styles of fork/join processing should minimally 187 * implement {@code protected} methods {@link #exec}, {@link 188 * #setRawResult}, and {@link #getRawResult}, while also introducing 189 * an abstract computational method that can be implemented in its 190 * subclasses, possibly relying on other {@code protected} methods 191 * provided by this class. 192 * 193 * <p>ForkJoinTasks should perform relatively small amounts of 194 * computation. Large tasks should be split into smaller subtasks, 195 * usually via recursive decomposition. As a very rough rule of thumb, 196 * a task should perform more than 100 and less than 10000 basic 197 * computational steps, and should avoid indefinite looping. If tasks 198 * are too big, then parallelism cannot improve throughput. If too 199 * small, then memory and internal task maintenance overhead may 200 * overwhelm processing. 201 * 202 * <p>This class provides {@code adapt} methods for {@link Runnable} 203 * and {@link Callable}, that may be of use when mixing execution of 204 * {@code ForkJoinTasks} with other kinds of tasks. When all tasks are 205 * of this form, consider using a pool constructed in <em>asyncMode</em>. 206 * 207 * <p>ForkJoinTasks are {@code Serializable}, which enables them to be 208 * used in extensions such as remote execution frameworks. It is 209 * sensible to serialize tasks only before or after, but not during, 210 * execution. Serialization is not relied on during execution itself. 211 * 212 * @author Doug Lea 213 */ 214 abstract class ForkJoinTask(V) : Future!(V), IForkJoinTask { 215 216 /* 217 * See the internal documentation of class ForkJoinPool for a 218 * general implementation overview. ForkJoinTasks are mainly 219 * responsible for maintaining their "status" field amidst relays 220 * to methods in ForkJoinWorkerThread and ForkJoinPool. 221 * 222 * The methods of this class are more-or-less layered into 223 * (1) basic status maintenance 224 * (2) execution and awaiting completion 225 * (3) user-level methods that additionally report results. 226 * This is sometimes hard to see because this file orders exported 227 * methods in a way that flows well in javadocs. 228 */ 229 230 /** 231 * The status field holds run control status bits packed into a 232 * single int to ensure atomicity. Status is initially zero, and 233 * takes on nonnegative values until completed, upon which it 234 * holds (sign bit) DONE, possibly with ABNORMAL (cancelled or 235 * exceptional) and THROWN (in which case an exception has been 236 * stored). Tasks with dependent blocked waiting joiners have the 237 * SIGNAL bit set. Completion of a task with SIGNAL set awakens 238 * any waiters via notifyAll. (Waiters also help signal others 239 * upon completion.) 240 * 241 * These control bits occupy only (some of) the upper half (16 242 * bits) of status field. The lower bits are used for user-defined 243 * tags. 244 */ 245 shared int status; // accessed directly by pool and workers 246 247 Mutex thisMutex; 248 Condition thisLocker; 249 250 private enum int DONE = 1 << 31; // must be negative 251 private enum int ABNORMAL = 1 << 18; // set atomically with DONE 252 private enum int THROWN = 1 << 17; // set atomically with ABNORMAL 253 private enum int SIGNAL = 1 << 16; // true if joiner waiting 254 private enum int SMASK = 0xffff; // short bits for tags 255 256 this() { 257 thisMutex = new Mutex(this); 258 thisLocker = new Condition(thisMutex); 259 } 260 261 static bool isExceptionalStatus(int s) { // needed by subclasses 262 return (s & THROWN) != 0; 263 } 264 265 /** 266 * Sets DONE status and wakes up threads waiting to join this task. 267 * 268 * @return status on exit 269 */ 270 private int setDone() { 271 int s = AtomicHelper.getAndBitwiseOr(this.status, DONE); 272 debug(HUNT_CONCURRENCY_DEBUG) { 273 tracef("status: last=%d, new=%d", s, status); 274 } 275 if((s & SIGNAL) != 0) { 276 synchronized (this) { 277 debug(HUNT_CONCURRENCY_DEBUG) info("notifying on done ....."); 278 thisLocker.notifyAll(); 279 } 280 } 281 return s | DONE; 282 } 283 284 /** 285 * Marks cancelled or exceptional completion unless already done. 286 * 287 * @param completion must be DONE | ABNORMAL, ORed with THROWN if exceptional 288 * @return status on exit 289 */ 290 private int abnormalCompletion(int completion) { 291 for (int s, ns;;) { 292 if ((s = status) < 0) { 293 return s; 294 } else { 295 if(this.status == s) { 296 this.status = ns = s | completion; 297 if ((s & SIGNAL) != 0) 298 synchronized (this) { 299 thisLocker.notifyAll(); 300 } 301 return ns; 302 } 303 } 304 // FIXME: Needing refactor or cleanup -@zxp at 2019/2/7 10:33:03 305 // 306 // if (STATUS.weakCompareAndSet(this, s, ns = s | completion)) { 307 // if ((s & SIGNAL) != 0) 308 // synchronized (this) { notifyAll(); } 309 // return ns; 310 // } 311 } 312 } 313 314 int getStatus() { 315 return status; 316 } 317 318 /** 319 * Primary execution method for stolen tasks. Unless done, calls 320 * exec and records status if completed, but doesn't wait for 321 * completion otherwise. 322 * 323 * @return status on exit from this method 324 */ 325 final int doExec() { 326 int s; bool completed; 327 if ((s = status) >= 0) { 328 try { 329 completed = exec(); 330 } catch (Throwable rex) { 331 completed = false; 332 s = setExceptionalCompletion(rex); 333 } 334 debug(HUNT_CONCURRENCY_DEBUG) tracef("completed: %s", completed); 335 if (completed) { 336 s = setDone(); 337 } 338 } 339 return s; 340 } 341 342 /** 343 * If not done, sets SIGNAL status and performs Object.wait(timeout). 344 * This task may or may not be done on exit. Ignores interrupts. 345 * 346 * @param timeout using Object.wait conventions. 347 */ 348 final void internalWait(long timeout) { 349 int s = cast(int)(this.status | SIGNAL); 350 if (s >= 0) { 351 synchronized (this) { 352 if (status >= 0) 353 try { 354 thisLocker.wait(dur!(TimeUnit.Millisecond)(timeout)); 355 } catch (InterruptedException ie) { } 356 else 357 thisLocker.notifyAll(); 358 } 359 } 360 } 361 362 /** 363 * Blocks a non-worker-thread until completion. 364 * @return status upon completion 365 */ 366 private int externalAwaitDone() { 367 int s = tryExternalHelp(); 368 if(s < 0) 369 return s; 370 371 s = AtomicHelper.getAndBitwiseOr(this.status, SIGNAL); 372 debug(HUNT_CONCURRENCY_DEBUG) { 373 infof("status: last=%d, new=%d", s, status); 374 } 375 if(s < 0) 376 return s; 377 378 bool interrupted = false; 379 synchronized (this) { 380 for (;;) { 381 if ((s = status) >= 0) { 382 try { 383 thisLocker.wait(Duration.zero); 384 } catch (InterruptedException ie) { 385 interrupted = true; 386 } 387 } 388 else { 389 thisLocker.notifyAll(); 390 break; 391 } 392 } 393 } 394 if (interrupted) { 395 ThreadEx th = cast(ThreadEx) Thread.getThis(); 396 if(th !is null) 397 th.interrupt(); 398 } 399 return s; 400 } 401 402 /** 403 * Blocks a non-worker-thread until completion or interruption. 404 */ 405 private int externalInterruptibleAwaitDone() { 406 int s = tryExternalHelp(); 407 if(s <0) { 408 if (ThreadEx.interrupted()) 409 throw new InterruptedException(); 410 return s; 411 } 412 413 s = AtomicHelper.getAndBitwiseOr(this.status, SIGNAL); 414 debug(HUNT_CONCURRENCY_DEBUG) { 415 infof("status: last=%d, new=%d", s, status); 416 } 417 if (s >= 0) { 418 synchronized (this) { 419 for (;;) { 420 if ((s = status) >= 0) 421 thisLocker.wait(Duration.zero); 422 else { 423 thisLocker.notifyAll(); 424 break; 425 } 426 } 427 } 428 } 429 else if (ThreadEx.interrupted()) 430 throw new InterruptedException(); 431 return s; 432 } 433 434 /** 435 * Tries to help with tasks allowed for external callers. 436 * 437 * @return current status 438 */ 439 private int tryExternalHelp() { 440 int s = status; 441 if(s<0) return s; 442 ICountedCompleter cc = cast(ICountedCompleter)this; 443 if(cc !is null) { 444 return ForkJoinPool.common.externalHelpComplete( 445 cc, 0); 446 } else if(ForkJoinPool.common.tryExternalUnpush(this)) { 447 return doExec(); 448 } else 449 return 0; 450 // return ((s = status) < 0 ? s: 451 // (this instanceof CountedCompleter) ? 452 // ForkJoinPool.common.externalHelpComplete( 453 // (ICountedCompleter)this, 0) : 454 // ForkJoinPool.common.tryExternalUnpush(this) ? 455 // doExec() : 0); 456 } 457 458 /** 459 * Implementation for join, get, quietlyJoin. Directly handles 460 * only cases of already-completed, external wait, and 461 * unfork+exec. Others are relayed to ForkJoinPool.awaitJoin. 462 * 463 * @return status upon completion 464 */ 465 private int doJoin() { 466 int s = status; 467 if(s < 0) return s; 468 469 ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)Thread.getThis(); 470 if(wt !is null) { 471 WorkQueue w = wt.workQueue; 472 if(w.tryUnpush(this) && (s = doExec()) < 0 ) 473 return s; 474 else 475 return wt.pool.awaitJoin(w, this, MonoTime.zero); 476 } else { 477 return externalAwaitDone(); 478 } 479 } 480 481 /** 482 * Implementation for invoke, quietlyInvoke. 483 * 484 * @return status upon completion 485 */ 486 private int doInvoke() { 487 int s = doExec(); 488 if(s < 0) 489 return s; 490 ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)Thread.getThis(); 491 if(wt !is null) { 492 return wt.pool.awaitJoin(wt.workQueue, this, MonoTime.zero()); 493 } else { 494 return externalAwaitDone(); 495 } 496 } 497 498 /** 499 * Records exception and sets status. 500 * 501 * @return status on exit 502 */ 503 final int recordExceptionalCompletion(Throwable ex) { 504 int s; 505 if ((s = status) >= 0) { 506 size_t h = this.toHash(); 507 ReentrantLock lock = ForkJoinTaskHelper.exceptionTableLock; 508 lock.lock(); 509 try { 510 ExceptionNode[] t = ForkJoinTaskHelper.exceptionTable; 511 size_t i = h & (t.length - 1); 512 for (ExceptionNode e = t[i]; ; e = e.next) { 513 if (e is null) { 514 t[i] = new ExceptionNode(this, ex, t[i]); 515 break; 516 } 517 if (e.get() == this) // already present 518 break; 519 } 520 } finally { 521 lock.unlock(); 522 } 523 s = abnormalCompletion(DONE | ABNORMAL | THROWN); 524 } 525 return s; 526 } 527 528 /** 529 * Records exception and possibly propagates. 530 * 531 * @return status on exit 532 */ 533 private int setExceptionalCompletion(Throwable ex) { 534 int s = recordExceptionalCompletion(ex); 535 if ((s & THROWN) != 0) 536 internalPropagateException(ex); 537 return s; 538 } 539 540 /** 541 * Hook for exception propagation support for tasks with completers. 542 */ 543 void internalPropagateException(Throwable ex) { 544 } 545 546 /** 547 * Removes exception node and clears status. 548 */ 549 private void clearExceptionalCompletion() { 550 size_t h = this.toHash(); 551 ReentrantLock lock = ForkJoinTaskHelper.exceptionTableLock; 552 lock.lock(); 553 try { 554 ExceptionNode[] t = ForkJoinTaskHelper.exceptionTable; 555 size_t i = h & (t.length - 1); 556 ExceptionNode e = t[i]; 557 ExceptionNode pred = null; 558 while (e !is null) { 559 ExceptionNode next = e.next; 560 if (e.get() == this) { 561 if (pred is null) 562 t[i] = next; 563 else 564 pred.next = next; 565 break; 566 } 567 pred = e; 568 e = next; 569 } 570 status = 0; 571 } finally { 572 lock.unlock(); 573 } 574 } 575 576 /** 577 * Returns a rethrowable exception for this task, if available. 578 * To provide accurate stack traces, if the exception was not 579 * thrown by the current thread, we try to create a new exception 580 * of the same type as the one thrown, but with the recorded 581 * exception as its cause. If there is no such constructor, we 582 * instead try to use a no-arg constructor, followed by initCause, 583 * to the same effect. If none of these apply, or any fail due to 584 * other exceptions, we return the recorded exception, which is 585 * still correct, although it may contain a misleading stack 586 * trace. 587 * 588 * @return the exception, or null if none 589 */ 590 private Throwable getThrowableException() { 591 size_t h = this.toHash(); 592 ExceptionNode e; 593 ReentrantLock lock = ForkJoinTaskHelper.exceptionTableLock; 594 lock.lock(); 595 try { 596 ExceptionNode[] t = ForkJoinTaskHelper.exceptionTable; 597 e = t[h & ($ - 1)]; 598 while (e !is null && e.get() !is this) 599 e = e.next; 600 } finally { 601 lock.unlock(); 602 } 603 Throwable ex; 604 if (e is null || (ex = e.ex) is null) 605 return null; 606 return ex; 607 } 608 609 610 /** 611 * Throws exception, if any, associated with the given status. 612 */ 613 private void reportException(int s) { 614 ForkJoinTaskHelper.rethrow((s & THROWN) != 0 ? getThrowableException() : 615 new CancellationException()); 616 } 617 618 // methods 619 620 /** 621 * Arranges to asynchronously execute this task in the pool the 622 * current task is running in, if applicable, or using the {@link 623 * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}. While 624 * it is not necessarily enforced, it is a usage error to fork a 625 * task more than once unless it has completed and been 626 * reinitialized. Subsequent modifications to the state of this 627 * task or any data it operates on are not necessarily 628 * consistently observable by any thread other than the one 629 * executing it unless preceded by a call to {@link #join} or 630 * related methods, or a call to {@link #isDone} returning {@code 631 * true}. 632 * 633 * @return {@code this}, to simplify usage 634 */ 635 final ForkJoinTask!(V) fork() { 636 ForkJoinWorkerThread t = cast(ForkJoinWorkerThread)Thread.getThis(); 637 if (t !is null) 638 t.workQueue.push(this); 639 else 640 ForkJoinPool.common.externalPush(this); 641 return this; 642 } 643 644 /** 645 * Returns the result of the computation when it 646 * {@linkplain #isDone is done}. 647 * This method differs from {@link #get()} in that abnormal 648 * completion results in {@code RuntimeException} or {@code Error}, 649 * not {@code ExecutionException}, and that interrupts of the 650 * calling thread do <em>not</em> cause the method to abruptly 651 * return by throwing {@code InterruptedException}. 652 * 653 * @return the computed result 654 */ 655 final V join() { 656 int s; 657 if (((s = doJoin()) & ABNORMAL) != 0) { 658 reportException(s); 659 } 660 661 static if(!is(V == void)) { 662 return getRawResult(); 663 } 664 } 665 666 /** 667 * Commences performing this task, awaits its completion if 668 * necessary, and returns its result, or throws an (unchecked) 669 * {@code RuntimeException} or {@code Error} if the underlying 670 * computation did so. 671 * 672 * @return the computed result 673 */ 674 final V invoke() { 675 int s; 676 if (((s = doInvoke()) & ABNORMAL) != 0) 677 reportException(s); 678 679 static if(!is(V == void)) { 680 return getRawResult(); 681 } 682 } 683 684 /** 685 * Forks the given tasks, returning when {@code isDone} holds for 686 * each task or an (unchecked) exception is encountered, in which 687 * case the exception is rethrown. If more than one task 688 * encounters an exception, then this method throws any one of 689 * these exceptions. If any task encounters an exception, the 690 * other may be cancelled. However, the execution status of 691 * individual tasks is not guaranteed upon exceptional return. The 692 * status of each task may be obtained using {@link 693 * #getException()} and related methods to check if they have been 694 * cancelled, completed normally or exceptionally, or left 695 * unprocessed. 696 * 697 * @param t1 the first task 698 * @param t2 the second task 699 * @throws NullPointerException if any task is null 700 */ 701 static void invokeAll(IForkJoinTask t1, IForkJoinTask t2) { 702 int s1, s2; 703 implementationMissing(false); 704 // t2.fork(); 705 // if (((s1 = t1.doInvoke()) & ABNORMAL) != 0) 706 // t1.reportException(s1); 707 // if (((s2 = t2.doJoin()) & ABNORMAL) != 0) 708 // t2.reportException(s2); 709 } 710 711 /** 712 * Forks the given tasks, returning when {@code isDone} holds for 713 * each task or an (unchecked) exception is encountered, in which 714 * case the exception is rethrown. If more than one task 715 * encounters an exception, then this method throws any one of 716 * these exceptions. If any task encounters an exception, others 717 * may be cancelled. However, the execution status of individual 718 * tasks is not guaranteed upon exceptional return. The status of 719 * each task may be obtained using {@link #getException()} and 720 * related methods to check if they have been cancelled, completed 721 * normally or exceptionally, or left unprocessed. 722 * 723 * @param tasks the tasks 724 * @throws NullPointerException if any task is null 725 */ 726 static void invokeAll(IForkJoinTask[] tasks...) { 727 Throwable ex = null; 728 int last = cast(int)tasks.length - 1; 729 // for (int i = last; i >= 0; --i) { 730 // IForkJoinTask t = tasks[i]; 731 // if (t is null) { 732 // if (ex is null) 733 // ex = new NullPointerException(); 734 // } 735 // else if (i != 0) 736 // t.fork(); 737 // else if ((t.doInvoke() & ABNORMAL) != 0 && ex is null) 738 // ex = t.getException(); 739 // } 740 // for (int i = 1; i <= last; ++i) { 741 // IForkJoinTask t = tasks[i]; 742 // if (t !is null) { 743 // if (ex !is null) 744 // t.cancel(false); 745 // else if ((t.doJoin() & ABNORMAL) != 0) 746 // ex = t.getException(); 747 // } 748 // } 749 implementationMissing(false); 750 if (ex !is null) 751 ForkJoinTaskHelper.rethrow(ex); 752 } 753 754 /** 755 * Forks all tasks in the specified collection, returning when 756 * {@code isDone} holds for each task or an (unchecked) exception 757 * is encountered, in which case the exception is rethrown. If 758 * more than one task encounters an exception, then this method 759 * throws any one of these exceptions. If any task encounters an 760 * exception, others may be cancelled. However, the execution 761 * status of individual tasks is not guaranteed upon exceptional 762 * return. The status of each task may be obtained using {@link 763 * #getException()} and related methods to check if they have been 764 * cancelled, completed normally or exceptionally, or left 765 * unprocessed. 766 * 767 * @param tasks the collection of tasks 768 * @param (T) the type of the values returned from the tasks 769 * @return the tasks argument, to simplify usage 770 * @throws NullPointerException if tasks or any element are null 771 */ 772 static Collection!(T) invokeAll(T)(Collection!(T) tasks) if(is(T : IForkJoinTask)) { 773 // TODO: Tasks pending completion -@zxp at 12/21/2018, 10:36:15 PM 774 // 775 implementationMissing(false); 776 // if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) { 777 // invokeAll(tasks.toArray(new IForkJoinTask[0])); 778 // return tasks; 779 // } 780 781 // List!(IForkJoinTask) ts = cast(List!(IForkJoinTask)) tasks; 782 // Throwable ex = null; 783 // int last = ts.size() - 1; 784 // for (int i = last; i >= 0; --i) { 785 // IForkJoinTask t = ts.get(i); 786 // if (t is null) { 787 // if (ex is null) 788 // ex = new NullPointerException(); 789 // } 790 // else if (i != 0) 791 // t.fork(); 792 // else if ((t.doInvoke() & ABNORMAL) != 0 && ex is null) 793 // ex = t.getException(); 794 // } 795 // for (int i = 1; i <= last; ++i) { 796 // IForkJoinTask t = ts.get(i); 797 // if (t !is null) { 798 // if (ex !is null) 799 // t.cancel(false); 800 // else if ((t.doJoin() & ABNORMAL) != 0) 801 // ex = t.getException(); 802 // } 803 // } 804 // if (ex !is null) 805 // rethrow(ex); 806 return tasks; 807 } 808 809 /** 810 * Attempts to cancel execution of this task. This attempt will 811 * fail if the task has already completed or could not be 812 * cancelled for some other reason. If successful, and this task 813 * has not started when {@code cancel} is called, execution of 814 * this task is suppressed. After this method returns 815 * successfully, unless there is an intervening call to {@link 816 * #reinitialize}, subsequent calls to {@link #isCancelled}, 817 * {@link #isDone}, and {@code cancel} will return {@code true} 818 * and calls to {@link #join} and related methods will result in 819 * {@code CancellationException}. 820 * 821 * <p>This method may be overridden in subclasses, but if so, must 822 * still ensure that these properties hold. In particular, the 823 * {@code cancel} method itself must not throw exceptions. 824 * 825 * <p>This method is designed to be invoked by <em>other</em> 826 * tasks. To terminate the current task, you can just return or 827 * throw an unchecked exception from its computation method, or 828 * invoke {@link #completeExceptionally(Throwable)}. 829 * 830 * @param mayInterruptIfRunning this value has no effect in the 831 * default implementation because interrupts are not used to 832 * control cancellation. 833 * 834 * @return {@code true} if this task is now cancelled 835 */ 836 bool cancel(bool mayInterruptIfRunning) { 837 int s = abnormalCompletion(DONE | ABNORMAL); 838 return (s & (ABNORMAL | THROWN)) == ABNORMAL; 839 } 840 841 final bool isDone() { 842 return status < 0; 843 } 844 845 final bool isCancelled() { 846 return (status & (ABNORMAL | THROWN)) == ABNORMAL; 847 } 848 849 /** 850 * Returns {@code true} if this task threw an exception or was cancelled. 851 * 852 * @return {@code true} if this task threw an exception or was cancelled 853 */ 854 final bool isCompletedAbnormally() { 855 return (status & ABNORMAL) != 0; 856 } 857 858 /** 859 * Returns {@code true} if this task completed without throwing an 860 * exception and was not cancelled. 861 * 862 * @return {@code true} if this task completed without throwing an 863 * exception and was not cancelled 864 */ 865 final bool isCompletedNormally() { 866 return (status & (DONE | ABNORMAL)) == DONE; 867 } 868 869 /** 870 * Returns the exception thrown by the base computation, or a 871 * {@code CancellationException} if cancelled, or {@code null} if 872 * none or if the method has not yet completed. 873 * 874 * @return the exception, or {@code null} if none 875 */ 876 final Throwable getException() { 877 int s = status; 878 return ((s & ABNORMAL) == 0 ? null : 879 (s & THROWN) == 0 ? new CancellationException() : 880 getThrowableException()); 881 } 882 883 /** 884 * Completes this task abnormally, and if not already aborted or 885 * cancelled, causes it to throw the given exception upon 886 * {@code join} and related operations. This method may be used 887 * to induce exceptions in asynchronous tasks, or to force 888 * completion of tasks that would not otherwise complete. Its use 889 * in other situations is discouraged. This method is 890 * overridable, but overridden versions must invoke {@code super} 891 * implementation to maintain guarantees. 892 * 893 * @param ex the exception to throw. If this exception is not a 894 * {@code RuntimeException} or {@code Error}, the actual exception 895 * thrown will be a {@code RuntimeException} with cause {@code ex}. 896 */ 897 void completeExceptionally(Exception ex) { 898 RuntimeException re = cast(RuntimeException)ex; 899 if(re !is null) { 900 setExceptionalCompletion(ex); 901 } else { 902 Error er = cast(Error)ex; 903 if(er is null) { 904 setExceptionalCompletion(new RuntimeException(ex)); 905 } else { 906 setExceptionalCompletion(ex); 907 } 908 } 909 } 910 911 /** 912 * Completes this task, and if not already aborted or cancelled, 913 * returning the given value as the result of subsequent 914 * invocations of {@code join} and related operations. This method 915 * may be used to provide results for asynchronous tasks, or to 916 * provide alternative handling for tasks that would not otherwise 917 * complete normally. Its use in other situations is 918 * discouraged. This method is overridable, but overridden 919 * versions must invoke {@code super} implementation to maintain 920 * guarantees. 921 * 922 * @param value the result value for this task 923 */ 924 static if(is(V == void)) { 925 void complete() { 926 // try { 927 // setRawResult(); 928 // } catch (Throwable rex) { 929 // setExceptionalCompletion(rex); 930 // return; 931 // } 932 setDone(); 933 } 934 } else { 935 void complete(V value) { 936 try { 937 setRawResult(value); 938 } catch (Throwable rex) { 939 setExceptionalCompletion(rex); 940 return; 941 } 942 setDone(); 943 } 944 } 945 946 /** 947 * Completes this task normally without setting a value. The most 948 * recent value established by {@link #setRawResult} (or {@code 949 * null} by default) will be returned as the result of subsequent 950 * invocations of {@code join} and related operations. 951 * 952 */ 953 final void quietlyComplete() { 954 setDone(); 955 } 956 957 /** 958 * Waits if necessary for the computation to complete, and then 959 * retrieves its result. 960 * 961 * @return the computed result 962 * @throws CancellationException if the computation was cancelled 963 * @throws ExecutionException if the computation threw an 964 * exception 965 * @throws InterruptedException if the current thread is not a 966 * member of a ForkJoinPool and was interrupted while waiting 967 */ 968 final V get() { 969 ForkJoinWorkerThread ft = cast(ForkJoinWorkerThread)Thread.getThis(); 970 int s = ft !is null ? doJoin() : externalInterruptibleAwaitDone(); 971 if ((s & THROWN) != 0) 972 throw new ExecutionException(getThrowableException()); 973 else if ((s & ABNORMAL) != 0) 974 throw new CancellationException(); 975 else { 976 static if(!is(V == void)) { 977 return getRawResult(); 978 } 979 } 980 } 981 982 /** 983 * Waits if necessary for at most the given time for the computation 984 * to complete, and then retrieves its result, if available. 985 * 986 * @param timeout the maximum time to wait 987 * @param unit the time unit of the timeout argument 988 * @return the computed result 989 * @throws CancellationException if the computation was cancelled 990 * @throws ExecutionException if the computation threw an 991 * exception 992 * @throws InterruptedException if the current thread is not a 993 * member of a ForkJoinPool and was interrupted while waiting 994 * @throws TimeoutException if the wait timed out 995 */ 996 final V get(Duration timeout) { 997 int s; 998 // TODO: Tasks pending completion -@zxp at 12/21/2018, 10:55:12 PM 999 // 1000 // if (Thread.interrupted()) 1001 // throw new InterruptedException(); 1002 1003 if ((s = status) >= 0 && timeout > Duration.zero) { 1004 MonoTime deadline = MonoTime.currTime + timeout; 1005 // long deadline = (d == 0L) ? 1L : d; // avoid 0 1006 ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)Thread.getThis(); 1007 if (wt !is null) { 1008 s = wt.pool.awaitJoin(wt.workQueue, this, deadline); 1009 } 1010 else { 1011 ICountedCompleter ic = cast(ICountedCompleter)this; 1012 if(ic !is null) { 1013 s = ForkJoinPool.common.externalHelpComplete(ic, 0); 1014 } else if(ForkJoinPool.common.tryExternalUnpush(this)){ 1015 s = doExec(); 1016 } else 1017 s = 0; 1018 1019 if (s >= 0) { 1020 Duration ns; // measure in nanosecs, but wait in millisecs 1021 long ms; 1022 while ((s = status) >= 0 && 1023 (ns = deadline - MonoTime.currTime) > Duration.zero) { 1024 if ((ms = ns.total!(TimeUnit.Millisecond)()) > 0L) { 1025 s = AtomicHelper.getAndBitwiseOr(this.status, SIGNAL); 1026 if( s >= 0) { 1027 synchronized (this) { 1028 if (status >= 0) // OK to throw InterruptedException 1029 thisLocker.wait(dur!(TimeUnit.Millisecond)(ms)); 1030 else 1031 thisLocker.notifyAll(); 1032 } 1033 } 1034 } 1035 } 1036 } 1037 } 1038 } 1039 if (s >= 0) 1040 throw new TimeoutException(); 1041 else if ((s & THROWN) != 0) 1042 throw new ExecutionException(getThrowableException()); 1043 else if ((s & ABNORMAL) != 0) 1044 throw new CancellationException(); 1045 else { 1046 static if(!is(V == void)) { 1047 return getRawResult(); 1048 } 1049 } 1050 } 1051 1052 /** 1053 * Joins this task, without returning its result or throwing its 1054 * exception. This method may be useful when processing 1055 * collections of tasks when some have been cancelled or otherwise 1056 * known to have aborted. 1057 */ 1058 final void quietlyJoin() { 1059 doJoin(); 1060 } 1061 1062 /** 1063 * Commences performing this task and awaits its completion if 1064 * necessary, without returning its result or throwing its 1065 * exception. 1066 */ 1067 final void quietlyInvoke() { 1068 doInvoke(); 1069 } 1070 1071 /** 1072 * Possibly executes tasks until the pool hosting the current task 1073 * {@linkplain ForkJoinPool#isQuiescent is quiescent}. This 1074 * method may be of use in designs in which many tasks are forked, 1075 * but none are explicitly joined, instead executing them until 1076 * all are processed. 1077 */ 1078 // static void helpQuiesce() { 1079 // Thread t; 1080 // if ((t = Thread.getThis()) instanceof ForkJoinWorkerThread) { 1081 // ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; 1082 // wt.pool.helpQuiescePool(wt.workQueue); 1083 // } 1084 // else 1085 // ForkJoinPool.quiesceCommonPool(); 1086 // } 1087 1088 /** 1089 * Resets the internal bookkeeping state of this task, allowing a 1090 * subsequent {@code fork}. This method allows repeated reuse of 1091 * this task, but only if reuse occurs when this task has either 1092 * never been forked, or has been forked, then completed and all 1093 * outstanding joins of this task have also completed. Effects 1094 * under any other usage conditions are not guaranteed. 1095 * This method may be useful when executing 1096 * pre-constructed trees of subtasks in loops. 1097 * 1098 * <p>Upon completion of this method, {@code isDone()} reports 1099 * {@code false}, and {@code getException()} reports {@code 1100 * null}. However, the value returned by {@code getRawResult} is 1101 * unaffected. To clear this value, you can invoke {@code 1102 * setRawResult(null)}. 1103 */ 1104 void reinitialize() { 1105 if ((status & THROWN) != 0) 1106 clearExceptionalCompletion(); 1107 else 1108 status = 0; 1109 } 1110 1111 /** 1112 * Returns the pool hosting the current thread, or {@code null} 1113 * if the current thread is executing outside of any ForkJoinPool. 1114 * 1115 * <p>This method returns {@code null} if and only if {@link 1116 * #inForkJoinPool} returns {@code false}. 1117 * 1118 * @return the pool, or {@code null} if none 1119 */ 1120 // static ForkJoinPool getPool() { 1121 // Thread t = Thread.getThis(); 1122 // return (t instanceof ForkJoinWorkerThread) ? 1123 // ((ForkJoinWorkerThread) t).pool : null; 1124 // } 1125 1126 /** 1127 * Returns {@code true} if the current thread is a {@link 1128 * ForkJoinWorkerThread} executing as a ForkJoinPool computation. 1129 * 1130 * @return {@code true} if the current thread is a {@link 1131 * ForkJoinWorkerThread} executing as a ForkJoinPool computation, 1132 * or {@code false} otherwise 1133 */ 1134 static bool inForkJoinPool() { 1135 ForkJoinWorkerThread t = cast(ForkJoinWorkerThread)Thread.getThis(); 1136 return t !is null; 1137 } 1138 1139 /** 1140 * Tries to unschedule this task for execution. This method will 1141 * typically (but is not guaranteed to) succeed if this task is 1142 * the most recently forked task by the current thread, and has 1143 * not commenced executing in another thread. This method may be 1144 * useful when arranging alternative local processing of tasks 1145 * that could have been, but were not, stolen. 1146 * 1147 * @return {@code true} if unforked 1148 */ 1149 bool tryUnfork() { 1150 ForkJoinWorkerThread t = cast(ForkJoinWorkerThread)Thread.getThis(); 1151 return t !is null? t.workQueue.tryUnpush(this) : 1152 ForkJoinPool.common.tryExternalUnpush(this); 1153 } 1154 1155 /** 1156 * Returns an estimate of the number of tasks that have been 1157 * forked by the current worker thread but not yet executed. This 1158 * value may be useful for heuristic decisions about whether to 1159 * fork other tasks. 1160 * 1161 * @return the number of tasks 1162 */ 1163 // static int getQueuedTaskCount() { 1164 // Thread t; ForkJoinPool.WorkQueue q; 1165 // if ((t = Thread.getThis()) instanceof ForkJoinWorkerThread) 1166 // q = ((ForkJoinWorkerThread)t).workQueue; 1167 // else 1168 // q = ForkJoinPool.commonSubmitterQueue(); 1169 // return (q is null) ? 0 : q.queueSize(); 1170 // } 1171 1172 /** 1173 * Returns an estimate of how many more locally queued tasks are 1174 * held by the current worker thread than there are other worker 1175 * threads that might steal them, or zero if this thread is not 1176 * operating in a ForkJoinPool. This value may be useful for 1177 * heuristic decisions about whether to fork other tasks. In many 1178 * usages of ForkJoinTasks, at steady state, each worker should 1179 * aim to maintain a small constant surplus (for example, 3) of 1180 * tasks, and to process computations locally if this threshold is 1181 * exceeded. 1182 * 1183 * @return the surplus number of tasks, which may be negative 1184 */ 1185 // static int getSurplusQueuedTaskCount() { 1186 // return ForkJoinPool.getSurplusQueuedTaskCount(); 1187 // } 1188 1189 // Extension methods 1190 static if(is(V == void)) { 1191 // protected abstract void setRawResult(); 1192 } else { 1193 1194 /** 1195 * Returns the result that would be returned by {@link #join}, even 1196 * if this task completed abnormally, or {@code null} if this task 1197 * is not known to have been completed. This method is designed 1198 * to aid debugging, as well as to support extensions. Its use in 1199 * any other context is discouraged. 1200 * 1201 * @return the result, or {@code null} if not completed 1202 */ 1203 abstract V getRawResult(); 1204 1205 /** 1206 * Forces the given value to be returned as a result. This method 1207 * is designed to support extensions, and should not in general be 1208 * called otherwise. 1209 * 1210 * @param value the value 1211 */ 1212 protected abstract void setRawResult(V value); 1213 } 1214 1215 /** 1216 * Immediately performs the base action of this task and returns 1217 * true if, upon return from this method, this task is guaranteed 1218 * to have completed normally. This method may return false 1219 * otherwise, to indicate that this task is not necessarily 1220 * complete (or is not known to be complete), for example in 1221 * asynchronous actions that require explicit invocations of 1222 * completion methods. This method may also throw an (unchecked) 1223 * exception to indicate abnormal exit. This method is designed to 1224 * support extensions, and should not in general be called 1225 * otherwise. 1226 * 1227 * @return {@code true} if this task is known to have completed normally 1228 */ 1229 protected abstract bool exec(); 1230 1231 /** 1232 * Returns, but does not unschedule or execute, a task queued by 1233 * the current thread but not yet executed, if one is immediately 1234 * available. There is no guarantee that this task will actually 1235 * be polled or executed next. Conversely, this method may return 1236 * null even if a task exists but cannot be accessed without 1237 * contention with other threads. This method is designed 1238 * primarily to support extensions, and is unlikely to be useful 1239 * otherwise. 1240 * 1241 * @return the next task, or {@code null} if none are available 1242 */ 1243 // protected static IForkJoinTask peekNextLocalTask() { 1244 // Thread t; ForkJoinPool.WorkQueue q; 1245 // if ((t = Thread.getThis()) instanceof ForkJoinWorkerThread) 1246 // q = ((ForkJoinWorkerThread)t).workQueue; 1247 // else 1248 // q = ForkJoinPool.commonSubmitterQueue(); 1249 // return (q is null) ? null : q.peek(); 1250 // } 1251 1252 /** 1253 * Unschedules and returns, without executing, the next task 1254 * queued by the current thread but not yet executed, if the 1255 * current thread is operating in a ForkJoinPool. This method is 1256 * designed primarily to support extensions, and is unlikely to be 1257 * useful otherwise. 1258 * 1259 * @return the next task, or {@code null} if none are available 1260 */ 1261 // protected static IForkJoinTask pollNextLocalTask() { 1262 // Thread t; 1263 // return ((t = Thread.getThis()) instanceof ForkJoinWorkerThread) ? 1264 // ((ForkJoinWorkerThread)t).workQueue.nextLocalTask() : 1265 // null; 1266 // } 1267 1268 /** 1269 * If the current thread is operating in a ForkJoinPool, 1270 * unschedules and returns, without executing, the next task 1271 * queued by the current thread but not yet executed, if one is 1272 * available, or if not available, a task that was forked by some 1273 * other thread, if available. Availability may be transient, so a 1274 * {@code null} result does not necessarily imply quiescence of 1275 * the pool this task is operating in. This method is designed 1276 * primarily to support extensions, and is unlikely to be useful 1277 * otherwise. 1278 * 1279 * @return a task, or {@code null} if none are available 1280 */ 1281 // protected static IForkJoinTask pollTask() { 1282 // Thread t; ForkJoinWorkerThread wt; 1283 // return ((t = Thread.getThis()) instanceof ForkJoinWorkerThread) ? 1284 // (wt = (ForkJoinWorkerThread)t).pool.nextTaskFor(wt.workQueue) : 1285 // null; 1286 // } 1287 1288 // /** 1289 // * If the current thread is operating in a ForkJoinPool, 1290 // * unschedules and returns, without executing, a task externally 1291 // * submitted to the pool, if one is available. Availability may be 1292 // * transient, so a {@code null} result does not necessarily imply 1293 // * quiescence of the pool. This method is designed primarily to 1294 // * support extensions, and is unlikely to be useful otherwise. 1295 // * 1296 // * @return a task, or {@code null} if none are available 1297 // */ 1298 // protected static IForkJoinTask pollSubmission() { 1299 // ForkJoinWorkerThread t = cast(ForkJoinWorkerThread)Thread.getThis(); 1300 // return t !is null ? t.pool.pollSubmission() : null; 1301 // } 1302 1303 // tag operations 1304 1305 /** 1306 * Returns the tag for this task. 1307 * 1308 * @return the tag for this task 1309 */ 1310 final short getForkJoinTaskTag() { 1311 return cast(short)status; 1312 } 1313 1314 /** 1315 * Atomically sets the tag value for this task and returns the old value. 1316 * 1317 * @param newValue the new tag value 1318 * @return the previous value of the tag 1319 */ 1320 final short setForkJoinTaskTag(short newValue) { 1321 while(true) { 1322 int s = status; 1323 if(AtomicHelper.compareAndSet(this.status, s, (s & ~SMASK) | (newValue & SMASK))) 1324 return cast(short)s; 1325 } 1326 // return 0; 1327 } 1328 1329 /** 1330 * Atomically conditionally sets the tag value for this task. 1331 * Among other applications, tags can be used as visit markers 1332 * in tasks operating on graphs, as in methods that check: {@code 1333 * if (task.compareAndSetForkJoinTaskTag((short)0, (short)1))} 1334 * before processing, otherwise exiting because the node has 1335 * already been visited. 1336 * 1337 * @param expect the expected tag value 1338 * @param update the new tag value 1339 * @return {@code true} if successful; i.e., the current value was 1340 * equal to {@code expect} and was changed to {@code update}. 1341 */ 1342 final bool compareAndSetForkJoinTaskTag(short expect, short update) { 1343 for (int s;;) { 1344 if (cast(short)(s = status) != expect) 1345 return false; 1346 if (AtomicHelper.compareAndSet(this.status, s, 1347 (s & ~SMASK) | (update & SMASK))) 1348 return true; 1349 } 1350 } 1351 1352 1353 /** 1354 * Returns a new {@code ForkJoinTask} that performs the {@code run} 1355 * method of the given {@code Runnable} as its action, and returns 1356 * a null result upon {@link #join}. 1357 * 1358 * @param runnable the runnable action 1359 * @return the task 1360 */ 1361 // static IForkJoinTask adapt(Runnable runnable) { 1362 // return new AdaptedRunnableAction(runnable); 1363 // } 1364 1365 /** 1366 * Returns a new {@code ForkJoinTask} that performs the {@code run} 1367 * method of the given {@code Runnable} as its action, and returns 1368 * the given result upon {@link #join}. 1369 * 1370 * @param runnable the runnable action 1371 * @param result the result upon completion 1372 * @param (T) the type of the result 1373 * @return the task 1374 */ 1375 static ForkJoinTask!(T) adapt(T)(Runnable runnable, T result) { 1376 return new AdaptedRunnable!(T)(runnable, result); 1377 } 1378 1379 /** 1380 * Returns a new {@code ForkJoinTask} that performs the {@code call} 1381 * method of the given {@code Callable} as its action, and returns 1382 * its result upon {@link #join}, translating any checked exceptions 1383 * encountered into {@code RuntimeException}. 1384 * 1385 * @param callable the callable action 1386 * @param (T) the type of the callable's result 1387 * @return the task 1388 */ 1389 static ForkJoinTask!(T) adapt(T)(Callable!(T) callable) { 1390 return new AdaptedCallable!(T)(callable); 1391 } 1392 } 1393 1394 1395 1396 /** 1397 * Adapter for Runnables. This implements RunnableFuture 1398 * to be compliant with AbstractExecutorService constraints 1399 * when used in ForkJoinPool. 1400 */ 1401 final class AdaptedRunnable(T) : ForkJoinTask!(T), RunnableFuture!(T) { 1402 final Runnable runnable; 1403 T result; 1404 this(Runnable runnable, T result) { 1405 if (runnable is null) throw new NullPointerException(); 1406 this.runnable = runnable; 1407 this.result = result; // OK to set this even before completion 1408 } 1409 final T getRawResult() { return result; } 1410 final void setRawResult(T v) { result = v; } 1411 final bool exec() { runnable.run(); return true; } 1412 final void run() { invoke(); } 1413 string toString() { 1414 return super.toString() ~ "[Wrapped task = " ~ runnable ~ "]"; 1415 } 1416 } 1417 1418 /** 1419 * Adapter for Runnables without results. 1420 */ 1421 final class AdaptedRunnableAction : ForkJoinTask!(void), Runnable { 1422 Runnable runnable; 1423 this(Runnable runnable) { 1424 if (runnable is null) throw new NullPointerException(); 1425 this.runnable = runnable; 1426 } 1427 // final Void getRawResult() { return null; } 1428 // final void setRawResult(Void v) { } 1429 final override bool exec() { runnable.run(); return true; } 1430 final void run() { invoke(); } 1431 override bool cancel(bool mayInterruptIfRunning) { 1432 return super.cancel(mayInterruptIfRunning); 1433 } 1434 1435 // override bool isCancelled() { 1436 // return super.isCancelled(); 1437 // } 1438 1439 // override bool isDone() { 1440 // return super.isDone(); 1441 // } 1442 1443 // override void get() { 1444 // super.get(); 1445 // } 1446 1447 // override void get(Duration timeout) { 1448 // super.get(timeout); 1449 // } 1450 1451 override string toString() { 1452 return super.toString() ~ "[Wrapped task = " ~ (cast(Object)runnable).toString() ~ "]"; 1453 } 1454 } 1455 1456 /** 1457 * Adapter for Runnables in which failure forces worker exception. 1458 */ 1459 final class RunnableExecuteAction : ForkJoinTask!(void) { 1460 Runnable runnable; 1461 this(Runnable runnable) { 1462 if (runnable is null) throw new NullPointerException(); 1463 this.runnable = runnable; 1464 } 1465 // final Void getRawResult() { return null; } 1466 // final void setRawResult(Void v) { } 1467 final override bool exec() { runnable.run(); return true; } 1468 override void internalPropagateException(Throwable ex) { 1469 ForkJoinTaskHelper.rethrow(ex); // rethrow outside exec() catches. 1470 } 1471 } 1472 1473 /** 1474 * Adapter for Callables. 1475 */ 1476 final class AdaptedCallable(T) : ForkJoinTask!(T), RunnableFuture!(T) { 1477 final Callable!(T) callable; 1478 T result; 1479 this(Callable!(T) callable) { 1480 if (callable is null) throw new NullPointerException(); 1481 this.callable = callable; 1482 } 1483 final T getRawResult() { return result; } 1484 final void setRawResult(T v) { result = v; } 1485 final bool exec() { 1486 try { 1487 result = callable.call(); 1488 return true; 1489 } catch (RuntimeException rex) { 1490 throw rex; 1491 } catch (Exception ex) { 1492 throw new RuntimeException(ex); 1493 } 1494 } 1495 final void run() { invoke(); } 1496 string toString() { 1497 return super.toString() ~ "[Wrapped task = " ~ callable ~ "]"; 1498 } 1499 } 1500 1501 1502 /*************************************************/ 1503 // CountedCompleter 1504 /*************************************************/ 1505 1506 interface ICountedCompleter : IForkJoinTask { 1507 ICountedCompleter getCompleter(); 1508 } 1509 1510 /** 1511 * A {@link ForkJoinTask} with a completion action performed when 1512 * triggered and there are no remaining pending actions. 1513 * CountedCompleters are in general more robust in the 1514 * presence of subtask stalls and blockage than are other forms of 1515 * ForkJoinTasks, but are less intuitive to program. Uses of 1516 * CountedCompleter are similar to those of other completion based 1517 * components (such as {@link java.nio.channels.CompletionHandler}) 1518 * except that multiple <em>pending</em> completions may be necessary 1519 * to trigger the completion action {@link #onCompletion(CountedCompleter)}, 1520 * not just one. 1521 * Unless initialized otherwise, the {@linkplain #getPendingCount pending 1522 * count} starts at zero, but may be (atomically) changed using 1523 * methods {@link #setPendingCount}, {@link #addToPendingCount}, and 1524 * {@link #compareAndSetPendingCount}. Upon invocation of {@link 1525 * #tryComplete}, if the pending action count is nonzero, it is 1526 * decremented; otherwise, the completion action is performed, and if 1527 * this completer itself has a completer, the process is continued 1528 * with its completer. As is the case with related synchronization 1529 * components such as {@link Phaser} and {@link Semaphore}, these methods 1530 * affect only internal counts; they do not establish any further 1531 * internal bookkeeping. In particular, the identities of pending 1532 * tasks are not maintained. As illustrated below, you can create 1533 * subclasses that do record some or all pending tasks or their 1534 * results when needed. As illustrated below, utility methods 1535 * supporting customization of completion traversals are also 1536 * provided. However, because CountedCompleters provide only basic 1537 * synchronization mechanisms, it may be useful to create further 1538 * abstract subclasses that maintain linkages, fields, and additional 1539 * support methods appropriate for a set of related usages. 1540 * 1541 * <p>A concrete CountedCompleter class must define method {@link 1542 * #compute}, that should in most cases (as illustrated below), invoke 1543 * {@code tryComplete()} once before returning. The class may also 1544 * optionally override method {@link #onCompletion(CountedCompleter)} 1545 * to perform an action upon normal completion, and method 1546 * {@link #onExceptionalCompletion(Throwable, CountedCompleter)} to 1547 * perform an action upon any exception. 1548 * 1549 * <p>CountedCompleters most often do not bear results, in which case 1550 * they are normally declared as {@code CountedCompleter!(void)}, and 1551 * will always return {@code null} as a result value. In other cases, 1552 * you should override method {@link #getRawResult} to provide a 1553 * result from {@code join(), invoke()}, and related methods. In 1554 * general, this method should return the value of a field (or a 1555 * function of one or more fields) of the CountedCompleter object that 1556 * holds the result upon completion. Method {@link #setRawResult} by 1557 * default plays no role in CountedCompleters. It is possible, but 1558 * rarely applicable, to override this method to maintain other 1559 * objects or fields holding result data. 1560 * 1561 * <p>A CountedCompleter that does not itself have a completer (i.e., 1562 * one for which {@link #getCompleter} returns {@code null}) can be 1563 * used as a regular ForkJoinTask with this added functionality. 1564 * However, any completer that in turn has another completer serves 1565 * only as an internal helper for other computations, so its own task 1566 * status (as reported in methods such as {@link ForkJoinTask#isDone}) 1567 * is arbitrary; this status changes only upon explicit invocations of 1568 * {@link #complete}, {@link ForkJoinTask#cancel}, 1569 * {@link ForkJoinTask#completeExceptionally(Throwable)} or upon 1570 * exceptional completion of method {@code compute}. Upon any 1571 * exceptional completion, the exception may be relayed to a task's 1572 * completer (and its completer, and so on), if one exists and it has 1573 * not otherwise already completed. Similarly, cancelling an internal 1574 * CountedCompleter has only a local effect on that completer, so is 1575 * not often useful. 1576 * 1577 * <p><b>Sample Usages.</b> 1578 * 1579 * <p><b>Parallel recursive decomposition.</b> CountedCompleters may 1580 * be arranged in trees similar to those often used with {@link 1581 * RecursiveAction}s, although the constructions involved in setting 1582 * them up typically vary. Here, the completer of each task is its 1583 * parent in the computation tree. Even though they entail a bit more 1584 * bookkeeping, CountedCompleters may be better choices when applying 1585 * a possibly time-consuming operation (that cannot be further 1586 * subdivided) to each element of an array or collection; especially 1587 * when the operation takes a significantly different amount of time 1588 * to complete for some elements than others, either because of 1589 * intrinsic variation (for example I/O) or auxiliary effects such as 1590 * garbage collection. Because CountedCompleters provide their own 1591 * continuations, other tasks need not block waiting to perform them. 1592 * 1593 * <p>For example, here is an initial version of a utility method that 1594 * uses divide-by-two recursive decomposition to divide work into 1595 * single pieces (leaf tasks). Even when work is split into individual 1596 * calls, tree-based techniques are usually preferable to directly 1597 * forking leaf tasks, because they reduce inter-thread communication 1598 * and improve load balancing. In the recursive case, the second of 1599 * each pair of subtasks to finish triggers completion of their parent 1600 * (because no result combination is performed, the default no-op 1601 * implementation of method {@code onCompletion} is not overridden). 1602 * The utility method sets up the root task and invokes it (here, 1603 * implicitly using the {@link ForkJoinPool#commonPool()}). It is 1604 * straightforward and reliable (but not optimal) to always set the 1605 * pending count to the number of child tasks and call {@code 1606 * tryComplete()} immediately before returning. 1607 * 1608 * <pre> {@code 1609 * static <E> void forEach(E[] array, Consumer<E> action) { 1610 * class Task extends CountedCompleter!(void) { 1611 * final int lo, hi; 1612 * Task(Task parent, int lo, int hi) { 1613 * super(parent); this.lo = lo; this.hi = hi; 1614 * } 1615 * 1616 * void compute() { 1617 * if (hi - lo >= 2) { 1618 * int mid = (lo + hi) >>> 1; 1619 * // must set pending count before fork 1620 * setPendingCount(2); 1621 * new Task(this, mid, hi).fork(); // right child 1622 * new Task(this, lo, mid).fork(); // left child 1623 * } 1624 * else if (hi > lo) 1625 * action.accept(array[lo]); 1626 * tryComplete(); 1627 * } 1628 * } 1629 * new Task(null, 0, array.length).invoke(); 1630 * }}</pre> 1631 * 1632 * This design can be improved by noticing that in the recursive case, 1633 * the task has nothing to do after forking its right task, so can 1634 * directly invoke its left task before returning. (This is an analog 1635 * of tail recursion removal.) Also, when the last action in a task 1636 * is to fork or invoke a subtask (a "tail call"), the call to {@code 1637 * tryComplete()} can be optimized away, at the cost of making the 1638 * pending count look "off by one". 1639 * 1640 * <pre> {@code 1641 * void compute() { 1642 * if (hi - lo >= 2) { 1643 * int mid = (lo + hi) >>> 1; 1644 * setPendingCount(1); // looks off by one, but correct! 1645 * new Task(this, mid, hi).fork(); // right child 1646 * new Task(this, lo, mid).compute(); // direct invoke 1647 * } else { 1648 * if (hi > lo) 1649 * action.accept(array[lo]); 1650 * tryComplete(); 1651 * } 1652 * }}</pre> 1653 * 1654 * As a further optimization, notice that the left task need not even exist. 1655 * Instead of creating a new one, we can continue using the original task, 1656 * and add a pending count for each fork. Additionally, because no task 1657 * in this tree implements an {@link #onCompletion(CountedCompleter)} method, 1658 * {@code tryComplete} can be replaced with {@link #propagateCompletion}. 1659 * 1660 * <pre> {@code 1661 * void compute() { 1662 * int n = hi - lo; 1663 * for (; n >= 2; n /= 2) { 1664 * addToPendingCount(1); 1665 * new Task(this, lo + n/2, lo + n).fork(); 1666 * } 1667 * if (n > 0) 1668 * action.accept(array[lo]); 1669 * propagateCompletion(); 1670 * }}</pre> 1671 * 1672 * When pending counts can be precomputed, they can be established in 1673 * the constructor: 1674 * 1675 * <pre> {@code 1676 * static <E> void forEach(E[] array, Consumer<E> action) { 1677 * class Task extends CountedCompleter!(void) { 1678 * final int lo, hi; 1679 * Task(Task parent, int lo, int hi) { 1680 * super(parent, 31 - Integer.numberOfLeadingZeros(hi - lo)); 1681 * this.lo = lo; this.hi = hi; 1682 * } 1683 * 1684 * void compute() { 1685 * for (int n = hi - lo; n >= 2; n /= 2) 1686 * new Task(this, lo + n/2, lo + n).fork(); 1687 * action.accept(array[lo]); 1688 * propagateCompletion(); 1689 * } 1690 * } 1691 * if (array.length > 0) 1692 * new Task(null, 0, array.length).invoke(); 1693 * }}</pre> 1694 * 1695 * Additional optimizations of such classes might entail specializing 1696 * classes for leaf steps, subdividing by say, four, instead of two 1697 * per iteration, and using an adaptive threshold instead of always 1698 * subdividing down to single elements. 1699 * 1700 * <p><b>Searching.</b> A tree of CountedCompleters can search for a 1701 * value or property in different parts of a data structure, and 1702 * report a result in an {@link 1703 * hunt.concurrency.atomic.AtomicReference AtomicReference} as 1704 * soon as one is found. The others can poll the result to avoid 1705 * unnecessary work. (You could additionally {@linkplain #cancel 1706 * cancel} other tasks, but it is usually simpler and more efficient 1707 * to just let them notice that the result is set and if so skip 1708 * further processing.) Illustrating again with an array using full 1709 * partitioning (again, in practice, leaf tasks will almost always 1710 * process more than one element): 1711 * 1712 * <pre> {@code 1713 * class Searcher<E> extends CountedCompleter<E> { 1714 * final E[] array; final AtomicReference<E> result; final int lo, hi; 1715 * Searcher(ICountedCompleter p, E[] array, AtomicReference<E> result, int lo, int hi) { 1716 * super(p); 1717 * this.array = array; this.result = result; this.lo = lo; this.hi = hi; 1718 * } 1719 * E getRawResult() { return result.get(); } 1720 * void compute() { // similar to ForEach version 3 1721 * int l = lo, h = hi; 1722 * while (result.get() is null && h >= l) { 1723 * if (h - l >= 2) { 1724 * int mid = (l + h) >>> 1; 1725 * addToPendingCount(1); 1726 * new Searcher(this, array, result, mid, h).fork(); 1727 * h = mid; 1728 * } 1729 * else { 1730 * E x = array[l]; 1731 * if (matches(x) && result.compareAndSet(null, x)) 1732 * quietlyCompleteRoot(); // root task is now joinable 1733 * break; 1734 * } 1735 * } 1736 * tryComplete(); // normally complete whether or not found 1737 * } 1738 * bool matches(E e) { ... } // return true if found 1739 * 1740 * static <E> E search(E[] array) { 1741 * return new Searcher<E>(null, array, new AtomicReference<E>(), 0, array.length).invoke(); 1742 * } 1743 * }}</pre> 1744 * 1745 * In this example, as well as others in which tasks have no other 1746 * effects except to {@code compareAndSet} a common result, the 1747 * trailing unconditional invocation of {@code tryComplete} could be 1748 * made conditional ({@code if (result.get() is null) tryComplete();}) 1749 * because no further bookkeeping is required to manage completions 1750 * once the root task completes. 1751 * 1752 * <p><b>Recording subtasks.</b> CountedCompleter tasks that combine 1753 * results of multiple subtasks usually need to access these results 1754 * in method {@link #onCompletion(CountedCompleter)}. As illustrated in the following 1755 * class (that performs a simplified form of map-reduce where mappings 1756 * and reductions are all of type {@code E}), one way to do this in 1757 * divide and conquer designs is to have each subtask record its 1758 * sibling, so that it can be accessed in method {@code onCompletion}. 1759 * This technique applies to reductions in which the order of 1760 * combining left and right results does not matter; ordered 1761 * reductions require explicit left/right designations. Variants of 1762 * other streamlinings seen in the above examples may also apply. 1763 * 1764 * <pre> {@code 1765 * class MyMapper<E> { E apply(E v) { ... } } 1766 * class MyReducer<E> { E apply(E x, E y) { ... } } 1767 * class MapReducer<E> extends CountedCompleter<E> { 1768 * final E[] array; final MyMapper<E> mapper; 1769 * final MyReducer<E> reducer; final int lo, hi; 1770 * MapReducer<E> sibling; 1771 * E result; 1772 * MapReducer(ICountedCompleter p, E[] array, MyMapper<E> mapper, 1773 * MyReducer<E> reducer, int lo, int hi) { 1774 * super(p); 1775 * this.array = array; this.mapper = mapper; 1776 * this.reducer = reducer; this.lo = lo; this.hi = hi; 1777 * } 1778 * void compute() { 1779 * if (hi - lo >= 2) { 1780 * int mid = (lo + hi) >>> 1; 1781 * MapReducer<E> left = new MapReducer(this, array, mapper, reducer, lo, mid); 1782 * MapReducer<E> right = new MapReducer(this, array, mapper, reducer, mid, hi); 1783 * left.sibling = right; 1784 * right.sibling = left; 1785 * setPendingCount(1); // only right is pending 1786 * right.fork(); 1787 * left.compute(); // directly execute left 1788 * } 1789 * else { 1790 * if (hi > lo) 1791 * result = mapper.apply(array[lo]); 1792 * tryComplete(); 1793 * } 1794 * } 1795 * void onCompletion(ICountedCompleter caller) { 1796 * if (caller != this) { 1797 * MapReducer<E> child = (MapReducer<E>)caller; 1798 * MapReducer<E> sib = child.sibling; 1799 * if (sib is null || sib.result is null) 1800 * result = child.result; 1801 * else 1802 * result = reducer.apply(child.result, sib.result); 1803 * } 1804 * } 1805 * E getRawResult() { return result; } 1806 * 1807 * static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) { 1808 * return new MapReducer<E>(null, array, mapper, reducer, 1809 * 0, array.length).invoke(); 1810 * } 1811 * }}</pre> 1812 * 1813 * Here, method {@code onCompletion} takes a form common to many 1814 * completion designs that combine results. This callback-style method 1815 * is triggered once per task, in either of the two different contexts 1816 * in which the pending count is, or becomes, zero: (1) by a task 1817 * itself, if its pending count is zero upon invocation of {@code 1818 * tryComplete}, or (2) by any of its subtasks when they complete and 1819 * decrement the pending count to zero. The {@code caller} argument 1820 * distinguishes cases. Most often, when the caller is {@code this}, 1821 * no action is necessary. Otherwise the caller argument can be used 1822 * (usually via a cast) to supply a value (and/or links to other 1823 * values) to be combined. Assuming proper use of pending counts, the 1824 * actions inside {@code onCompletion} occur (once) upon completion of 1825 * a task and its subtasks. No additional synchronization is required 1826 * within this method to ensure thread safety of accesses to fields of 1827 * this task or other completed tasks. 1828 * 1829 * <p><b>Completion Traversals</b>. If using {@code onCompletion} to 1830 * process completions is inapplicable or inconvenient, you can use 1831 * methods {@link #firstComplete} and {@link #nextComplete} to create 1832 * custom traversals. For example, to define a MapReducer that only 1833 * splits out right-hand tasks in the form of the third ForEach 1834 * example, the completions must cooperatively reduce along 1835 * unexhausted subtask links, which can be done as follows: 1836 * 1837 * <pre> {@code 1838 * class MapReducer<E> extends CountedCompleter<E> { // version 2 1839 * final E[] array; final MyMapper<E> mapper; 1840 * final MyReducer<E> reducer; final int lo, hi; 1841 * MapReducer<E> forks, next; // record subtask forks in list 1842 * E result; 1843 * MapReducer(ICountedCompleter p, E[] array, MyMapper<E> mapper, 1844 * MyReducer<E> reducer, int lo, int hi, MapReducer<E> next) { 1845 * super(p); 1846 * this.array = array; this.mapper = mapper; 1847 * this.reducer = reducer; this.lo = lo; this.hi = hi; 1848 * this.next = next; 1849 * } 1850 * void compute() { 1851 * int l = lo, h = hi; 1852 * while (h - l >= 2) { 1853 * int mid = (l + h) >>> 1; 1854 * addToPendingCount(1); 1855 * (forks = new MapReducer(this, array, mapper, reducer, mid, h, forks)).fork(); 1856 * h = mid; 1857 * } 1858 * if (h > l) 1859 * result = mapper.apply(array[l]); 1860 * // process completions by reducing along and advancing subtask links 1861 * for (ICountedCompleter c = firstComplete(); c !is null; c = c.nextComplete()) { 1862 * for (MapReducer t = (MapReducer)c, s = t.forks; s !is null; s = t.forks = s.next) 1863 * t.result = reducer.apply(t.result, s.result); 1864 * } 1865 * } 1866 * E getRawResult() { return result; } 1867 * 1868 * static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) { 1869 * return new MapReducer<E>(null, array, mapper, reducer, 1870 * 0, array.length, null).invoke(); 1871 * } 1872 * }}</pre> 1873 * 1874 * <p><b>Triggers.</b> Some CountedCompleters are themselves never 1875 * forked, but instead serve as bits of plumbing in other designs; 1876 * including those in which the completion of one or more async tasks 1877 * triggers another async task. For example: 1878 * 1879 * <pre> {@code 1880 * class HeaderBuilder extends CountedCompleter<...> { ... } 1881 * class BodyBuilder extends CountedCompleter<...> { ... } 1882 * class PacketSender extends CountedCompleter<...> { 1883 * PacketSender(...) { super(null, 1); ... } // trigger on second completion 1884 * void compute() { } // never called 1885 * void onCompletion(ICountedCompleter caller) { sendPacket(); } 1886 * } 1887 * // sample use: 1888 * PacketSender p = new PacketSender(); 1889 * new HeaderBuilder(p, ...).fork(); 1890 * new BodyBuilder(p, ...).fork();}</pre> 1891 * 1892 * @author Doug Lea 1893 */ 1894 abstract class CountedCompleter(T) : ForkJoinTask!(T), ICountedCompleter { 1895 1896 /** This task's completer, or null if none */ 1897 ICountedCompleter completer; 1898 /** The number of pending tasks until completion */ 1899 int pending; 1900 1901 /** 1902 * Creates a new CountedCompleter with the given completer 1903 * and initial pending count. 1904 * 1905 * @param completer this task's completer, or {@code null} if none 1906 * @param initialPendingCount the initial pending count 1907 */ 1908 protected this(ICountedCompleter completer, 1909 int initialPendingCount) { 1910 this.completer = completer; 1911 this.pending = initialPendingCount; 1912 } 1913 1914 /** 1915 * Creates a new CountedCompleter with the given completer 1916 * and an initial pending count of zero. 1917 * 1918 * @param completer this task's completer, or {@code null} if none 1919 */ 1920 protected this(ICountedCompleter completer) { 1921 this.completer = completer; 1922 } 1923 1924 /** 1925 * Creates a new CountedCompleter with no completer 1926 * and an initial pending count of zero. 1927 */ 1928 protected this() { 1929 this.completer = null; 1930 } 1931 1932 ICountedCompleter getCompleter() { 1933 return completer; 1934 } 1935 1936 /** 1937 * The main computation performed by this task. 1938 */ 1939 abstract void compute(); 1940 1941 /** 1942 * Performs an action when method {@link #tryComplete} is invoked 1943 * and the pending count is zero, or when the unconditional 1944 * method {@link #complete} is invoked. By default, this method 1945 * does nothing. You can distinguish cases by checking the 1946 * identity of the given caller argument. If not equal to {@code 1947 * this}, then it is typically a subtask that may contain results 1948 * (and/or links to other results) to combine. 1949 * 1950 * @param caller the task invoking this method (which may 1951 * be this task itself) 1952 */ 1953 void onCompletion(ICountedCompleter caller) { 1954 } 1955 1956 /** 1957 * Performs an action when method {@link 1958 * #completeExceptionally(Throwable)} is invoked or method {@link 1959 * #compute} throws an exception, and this task has not already 1960 * otherwise completed normally. On entry to this method, this task 1961 * {@link ForkJoinTask#isCompletedAbnormally}. The return value 1962 * of this method controls further propagation: If {@code true} 1963 * and this task has a completer that has not completed, then that 1964 * completer is also completed exceptionally, with the same 1965 * exception as this completer. The default implementation of 1966 * this method does nothing except return {@code true}. 1967 * 1968 * @param ex the exception 1969 * @param caller the task invoking this method (which may 1970 * be this task itself) 1971 * @return {@code true} if this exception should be propagated to this 1972 * task's completer, if one exists 1973 */ 1974 bool onExceptionalCompletion(Throwable ex, ICountedCompleter caller) { 1975 return true; 1976 } 1977 1978 /** 1979 * Returns the completer established in this task's constructor, 1980 * or {@code null} if none. 1981 * 1982 * @return the completer 1983 */ 1984 final ICountedCompleter getCompleter() { 1985 return completer; 1986 } 1987 1988 /** 1989 * Returns the current pending count. 1990 * 1991 * @return the current pending count 1992 */ 1993 final int getPendingCount() { 1994 return pending; 1995 } 1996 1997 /** 1998 * Sets the pending count to the given value. 1999 * 2000 * @param count the count 2001 */ 2002 final void setPendingCount(int count) { 2003 pending = count; 2004 } 2005 2006 /** 2007 * Adds (atomically) the given value to the pending count. 2008 * 2009 * @param delta the value to add 2010 */ 2011 final void addToPendingCount(int delta) { 2012 PENDING.getAndAdd(this, delta); 2013 } 2014 2015 /** 2016 * Sets (atomically) the pending count to the given count only if 2017 * it currently holds the given expected value. 2018 * 2019 * @param expected the expected value 2020 * @param count the new value 2021 * @return {@code true} if successful 2022 */ 2023 final bool compareAndSetPendingCount(int expected, int count) { 2024 return PENDING.compareAndSet(this, expected, count); 2025 } 2026 2027 /** 2028 * If the pending count is nonzero, (atomically) decrements it. 2029 * 2030 * @return the initial (undecremented) pending count holding on entry 2031 * to this method 2032 */ 2033 final int decrementPendingCountUnlessZero() { 2034 int c; 2035 do {} while ((c = pending) != 0 && 2036 !PENDING.weakCompareAndSet(this, c, c - 1)); 2037 return c; 2038 } 2039 2040 /** 2041 * Returns the root of the current computation; i.e., this 2042 * task if it has no completer, else its completer's root. 2043 * 2044 * @return the root of the current computation 2045 */ 2046 final ICountedCompleter getRoot() { 2047 ICountedCompleter a = this, p; 2048 while ((p = a.completer) !is null) 2049 a = p; 2050 return a; 2051 } 2052 2053 /** 2054 * If the pending count is nonzero, decrements the count; 2055 * otherwise invokes {@link #onCompletion(CountedCompleter)} 2056 * and then similarly tries to complete this task's completer, 2057 * if one exists, else marks this task as complete. 2058 */ 2059 final void tryComplete() { 2060 ICountedCompleter a = this, s = a; 2061 for (int c;;) { 2062 if ((c = a.pending) == 0) { 2063 a.onCompletion(s); 2064 if ((a = (s = a).completer) is null) { 2065 s.quietlyComplete(); 2066 return; 2067 } 2068 } 2069 else if (PENDING.weakCompareAndSet(a, c, c - 1)) 2070 return; 2071 } 2072 } 2073 2074 /** 2075 * Equivalent to {@link #tryComplete} but does not invoke {@link 2076 * #onCompletion(CountedCompleter)} along the completion path: 2077 * If the pending count is nonzero, decrements the count; 2078 * otherwise, similarly tries to complete this task's completer, if 2079 * one exists, else marks this task as complete. This method may be 2080 * useful in cases where {@code onCompletion} should not, or need 2081 * not, be invoked for each completer in a computation. 2082 */ 2083 final void propagateCompletion() { 2084 ICountedCompleter a = this, s; 2085 for (int c;;) { 2086 if ((c = a.pending) == 0) { 2087 if ((a = (s = a).completer) is null) { 2088 s.quietlyComplete(); 2089 return; 2090 } 2091 } 2092 else if (PENDING.weakCompareAndSet(a, c, c - 1)) 2093 return; 2094 } 2095 } 2096 2097 /** 2098 * Regardless of pending count, invokes 2099 * {@link #onCompletion(CountedCompleter)}, marks this task as 2100 * complete and further triggers {@link #tryComplete} on this 2101 * task's completer, if one exists. The given rawResult is 2102 * used as an argument to {@link #setRawResult} before invoking 2103 * {@link #onCompletion(CountedCompleter)} or marking this task 2104 * as complete; its value is meaningful only for classes 2105 * overriding {@code setRawResult}. This method does not modify 2106 * the pending count. 2107 * 2108 * <p>This method may be useful when forcing completion as soon as 2109 * any one (versus all) of several subtask results are obtained. 2110 * However, in the common (and recommended) case in which {@code 2111 * setRawResult} is not overridden, this effect can be obtained 2112 * more simply using {@link #quietlyCompleteRoot()}. 2113 * 2114 * @param rawResult the raw result 2115 */ 2116 void complete(T rawResult) { 2117 ICountedCompleter p; 2118 setRawResult(rawResult); 2119 onCompletion(this); 2120 quietlyComplete(); 2121 if ((p = completer) !is null) 2122 p.tryComplete(); 2123 } 2124 2125 /** 2126 * If this task's pending count is zero, returns this task; 2127 * otherwise decrements its pending count and returns {@code null}. 2128 * This method is designed to be used with {@link #nextComplete} in 2129 * completion traversal loops. 2130 * 2131 * @return this task, if pending count was zero, else {@code null} 2132 */ 2133 final ICountedCompleter firstComplete() { 2134 for (int c;;) { 2135 if ((c = pending) == 0) 2136 return this; 2137 else if (PENDING.weakCompareAndSet(this, c, c - 1)) 2138 return null; 2139 } 2140 } 2141 2142 /** 2143 * If this task does not have a completer, invokes {@link 2144 * ForkJoinTask#quietlyComplete} and returns {@code null}. Or, if 2145 * the completer's pending count is non-zero, decrements that 2146 * pending count and returns {@code null}. Otherwise, returns the 2147 * completer. This method can be used as part of a completion 2148 * traversal loop for homogeneous task hierarchies: 2149 * 2150 * <pre> {@code 2151 * for (ICountedCompleter c = firstComplete(); 2152 * c !is null; 2153 * c = c.nextComplete()) { 2154 * // ... process c ... 2155 * }}</pre> 2156 * 2157 * @return the completer, or {@code null} if none 2158 */ 2159 final ICountedCompleter nextComplete() { 2160 ICountedCompleter p; 2161 if ((p = completer) !is null) 2162 return p.firstComplete(); 2163 else { 2164 quietlyComplete(); 2165 return null; 2166 } 2167 } 2168 2169 /** 2170 * Equivalent to {@code getRoot().quietlyComplete()}. 2171 */ 2172 final void quietlyCompleteRoot() { 2173 for (ICountedCompleter a = this, p;;) { 2174 if ((p = a.completer) is null) { 2175 a.quietlyComplete(); 2176 return; 2177 } 2178 a = p; 2179 } 2180 } 2181 2182 /** 2183 * If this task has not completed, attempts to process at most the 2184 * given number of other unprocessed tasks for which this task is 2185 * on the completion path, if any are known to exist. 2186 * 2187 * @param maxTasks the maximum number of tasks to process. If 2188 * less than or equal to zero, then no tasks are 2189 * processed. 2190 */ 2191 final void helpComplete(int maxTasks) { 2192 Thread t = Thread.getThis(); 2193 ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)t; 2194 if (maxTasks > 0 && status >= 0) { 2195 if (wt !is null) 2196 wt.pool.helpComplete(wt.workQueue, this, maxTasks); 2197 else 2198 ForkJoinPool.common.externalHelpComplete(this, maxTasks); 2199 } 2200 } 2201 2202 /** 2203 * Supports ForkJoinTask exception propagation. 2204 */ 2205 void internalPropagateException(Throwable ex) { 2206 ICountedCompleter a = this, s = a; 2207 while (a.onExceptionalCompletion(ex, s) && 2208 (a = (s = a).completer) !is null && a.status >= 0 && 2209 isExceptionalStatus(a.recordExceptionalCompletion(ex))) { 2210 2211 } 2212 } 2213 2214 /** 2215 * Implements execution conventions for CountedCompleters. 2216 */ 2217 protected final bool exec() { 2218 compute(); 2219 return false; 2220 } 2221 2222 /** 2223 * Returns the result of the computation. By default, 2224 * returns {@code null}, which is appropriate for {@code Void} 2225 * actions, but in other cases should be overridden, almost 2226 * always to return a field or function of a field that 2227 * holds the result upon completion. 2228 * 2229 * @return the result of the computation 2230 */ 2231 T getRawResult() { return null; } 2232 2233 /** 2234 * A method that result-bearing CountedCompleters may optionally 2235 * use to help maintain result data. By default, does nothing. 2236 * Overrides are not recommended. However, if this method is 2237 * overridden to update existing objects or fields, then it must 2238 * in general be defined to be thread-safe. 2239 */ 2240 protected void setRawResult(T t) { } 2241 2242 // VarHandle mechanics 2243 // private static final VarHandle PENDING; 2244 // static { 2245 // try { 2246 // MethodHandles.Lookup l = MethodHandles.lookup(); 2247 // PENDING = l.findVarHandle(CountedCompleter.class, "pending", int.class); 2248 2249 // } catch (ReflectiveOperationException e) { 2250 // throw new ExceptionInInitializerError(e); 2251 // } 2252 // } 2253 } 2254 2255 2256 /** 2257 * A recursive result-bearing {@link ForkJoinTask}. 2258 * 2259 * <p>For a classic example, here is a task computing Fibonacci numbers: 2260 * 2261 * <pre> {@code 2262 * class Fibonacci extends RecursiveTask<Integer> { 2263 * final int n; 2264 * Fibonacci(int n) { this.n = n; } 2265 * protected Integer compute() { 2266 * if (n <= 1) 2267 * return n; 2268 * Fibonacci f1 = new Fibonacci(n - 1); 2269 * f1.fork(); 2270 * Fibonacci f2 = new Fibonacci(n - 2); 2271 * return f2.compute() + f1.join(); 2272 * } 2273 * }}</pre> 2274 * 2275 * However, besides being a dumb way to compute Fibonacci functions 2276 * (there is a simple fast linear algorithm that you'd use in 2277 * practice), this is likely to perform poorly because the smallest 2278 * subtasks are too small to be worthwhile splitting up. Instead, as 2279 * is the case for nearly all fork/join applications, you'd pick some 2280 * minimum granularity size (for example 10 here) for which you always 2281 * sequentially solve rather than subdividing. 2282 * 2283 * @author Doug Lea 2284 */ 2285 abstract class RecursiveTask(V) : ForkJoinTask!V { 2286 2287 /** 2288 * The result of the computation. 2289 */ 2290 V result; 2291 2292 /** 2293 * The main computation performed by this task. 2294 * @return the result of the computation 2295 */ 2296 protected abstract V compute(); 2297 2298 final override V getRawResult() { 2299 return result; 2300 } 2301 2302 protected final override void setRawResult(V value) { 2303 result = value; 2304 } 2305 2306 /** 2307 * Implements execution conventions for RecursiveTask. 2308 */ 2309 protected final override bool exec() { 2310 result = compute(); 2311 return true; 2312 } 2313 2314 }