1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 module hunt.concurrency.CompletableFuture; 37 38 import hunt.concurrency.CompletionStage; 39 import hunt.concurrency.Delayed; 40 import hunt.concurrency.Exceptions; 41 import hunt.concurrency.ForkJoinPool; 42 import hunt.concurrency.ForkJoinTask; 43 import hunt.concurrency.Future; 44 import hunt.concurrency.Promise; 45 import hunt.concurrency.ScheduledThreadPoolExecutor; 46 import hunt.concurrency.thread; 47 import hunt.concurrency.ThreadFactory; 48 import hunt.concurrency.atomic.AtomicHelper; 49 50 import hunt.Exceptions; 51 import hunt.Functions; 52 import hunt.util.Common; 53 import hunt.util.DateTime; 54 import hunt.util.ObjectUtils; 55 import hunt.util.Runnable; 56 57 import hunt.logging.ConsoleLogger; 58 59 import core.thread; 60 import core.time; 61 import std.conv; 62 import std.concurrency : initOnce; 63 64 65 // Modes for Completion.tryFire. Signedness matters. 66 enum int SYNC = 0; 67 enum int ASYNC = 1; 68 enum int NESTED = -1; 69 70 71 /** The encoding of the null value. */ 72 __gshared AltResult NIL; // = new AltResult(null); 73 74 /* ------------- Async task preliminaries -------------- */ 75 76 private static bool USE_COMMON_POOL() { 77 return ForkJoinPool.getCommonPoolParallelism() > 1; 78 } 79 80 /** 81 * Default executor -- ForkJoinPool.commonPool() unless it cannot 82 * support parallelism. 83 */ 84 private static Executor ASYNC_POOL() { 85 __gshared Executor inst; 86 return initOnce!inst({ 87 Executor e; 88 if(USE_COMMON_POOL){ 89 e = ForkJoinPool.commonPool(); 90 } else { 91 e = new ThreadPerTaskExecutor(); 92 } 93 return e; 94 }()); 95 } 96 97 98 shared static this() { 99 NIL = new AltResult(null); 100 } 101 102 103 /** 104 */ 105 abstract class AbstractCompletableFuture { 106 shared bool _isDone = false; 107 bool _isNull = true; 108 109 AltResult altResult; 110 111 Completion stack; // Top of Treiber stack of dependent actions 112 113 abstract void bipush(AbstractCompletableFuture b, BiCompletion c); 114 abstract void cleanStack(); 115 abstract bool completeExceptionally(Throwable ex); 116 abstract void postComplete(); 117 abstract void unipush(Completion c); 118 119 /** 120 * Returns {@code true} if completed in any fashion: normally, 121 * exceptionally, or via cancellation. 122 * 123 * @return {@code true} if completed 124 */ 125 bool isDone() { 126 return _isDone; 127 } 128 129 130 /** 131 * Returns {@code true} if this CompletableFuture was cancelled 132 * before it completed normally. 133 * 134 * @return {@code true} if this CompletableFuture was cancelled 135 * before it completed normally 136 */ 137 bool isCancelled() { 138 AltResult ar = altResult; 139 if (ar !is null) { 140 CancellationException ce = cast(CancellationException)ar.ex; 141 if(ce !is null) 142 return true; 143 } 144 return false; 145 } 146 147 /** 148 * Returns {@code true} if this CompletableFuture completed 149 * exceptionally, in any way. Possible causes include 150 * cancellation, explicit invocation of {@code 151 * completeExceptionally}, and abrupt termination of a 152 * CompletionStage action. 153 * 154 * @return {@code true} if this CompletableFuture completed 155 * exceptionally 156 */ 157 bool isCompletedExceptionally() { 158 // Object r = result; 159 // AltResult ar = cast(AltResult)r; 160 // return ar !is null && r !is NIL; 161 162 return altResult !is null && altResult !is NIL; 163 } 164 165 bool isCompletedSuccessfully() { 166 return _isDone && altResult is null; 167 } 168 169 alias isFaulted = isCompletedExceptionally; 170 alias isCompleted = isDone; 171 } 172 173 /** 174 * A {@link Future} that may be explicitly completed (setting its 175 * value and status), and may be used as a {@link CompletionStage}, 176 * supporting dependent functions and actions that trigger upon its 177 * completion. 178 * 179 * <p>When two or more threads attempt to 180 * {@link #complete complete}, 181 * {@link #completeExceptionally completeExceptionally}, or 182 * {@link #cancel cancel} 183 * a CompletableFuture, only one of them succeeds. 184 * 185 * <p>In addition to these and related methods for directly 186 * manipulating status and results, CompletableFuture implements 187 * interface {@link CompletionStage} with the following policies: <ul> 188 * 189 * <li>Actions supplied for dependent completions of 190 * <em>non-async</em> methods may be performed by the thread that 191 * completes the current CompletableFuture, or by any other caller of 192 * a completion method. 193 * 194 * <li>All <em>async</em> methods without an explicit Executor 195 * argument are performed using the {@link ForkJoinPool#commonPool()} 196 * (unless it does not support a parallelism level of at least two, in 197 * which case, a new Thread is created to run each task). This may be 198 * overridden for non-static methods in subclasses by defining method 199 * {@link #defaultExecutor()}. To simplify monitoring, debugging, 200 * and tracking, all generated asynchronous tasks are instances of the 201 * marker interface {@link AsynchronousCompletionTask}. Operations 202 * with time-delays can use adapter methods defined in this class, for 203 * example: {@code supplyAsync(supplier, delayedExecutor(timeout, 204 * timeUnit))}. To support methods with delays and timeouts, this 205 * class maintains at most one daemon thread for triggering and 206 * cancelling actions, not for running them. 207 * 208 * <li>All CompletionStage methods are implemented independently of 209 * other methods, so the behavior of one method is not impacted 210 * by overrides of others in subclasses. 211 * 212 * <li>All CompletionStage methods return CompletableFutures. To 213 * restrict usages to only those methods defined in interface 214 * CompletionStage, use method {@link #minimalCompletionStage}. Or to 215 * ensure only that clients do not themselves modify a future, use 216 * method {@link #copy}. 217 * </ul> 218 * 219 * <p>CompletableFuture also implements {@link Future} with the following 220 * policies: <ul> 221 * 222 * <li>Since (unlike {@link FutureTask}) this class has no direct 223 * control over the computation that causes it to be completed, 224 * cancellation is treated as just another form of exceptional 225 * completion. Method {@link #cancel cancel} has the same effect as 226 * {@code completeExceptionally(new CancellationException())}. Method 227 * {@link #isCompletedExceptionally} can be used to determine if a 228 * CompletableFuture completed in any exceptional fashion. 229 * 230 * <li>In case of exceptional completion with a CompletionException, 231 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an 232 * {@link ExecutionException} with the same cause as held in the 233 * corresponding CompletionException. To simplify usage in most 234 * contexts, this class also defines methods {@link #join()} and 235 * {@link #getNow} that instead throw the CompletionException directly 236 * in these cases. 237 * </ul> 238 * 239 * <p>Arguments used to pass a completion result (that is, for 240 * parameters of type {@code T}) for methods accepting them may be 241 * null, but passing a null value for any other parameter will result 242 * in a {@link NullPointerException} being thrown. 243 * 244 * <p>Subclasses of this class should normally override the "virtual 245 * constructor" method {@link #newIncompleteFuture}, which establishes 246 * the concrete type returned by CompletionStage methods. For example, 247 * here is a class that substitutes a different default Executor and 248 * disables the {@code obtrude} methods: 249 * 250 * <pre> {@code 251 * class MyCompletableFuture(T) : CompletableFuture!(T) { 252 * final Executor myExecutor = ...; 253 * MyCompletableFuture() { } 254 * <U> CompletableFuture!(U) newIncompleteFuture() { 255 * return new MyCompletableFuture!(U)(); } 256 * Executor defaultExecutor() { 257 * return myExecutor; } 258 * void obtrudeValue(T value) { 259 * throw new UnsupportedOperationException(); } 260 * void obtrudeException(Throwable ex) { 261 * throw new UnsupportedOperationException(); } 262 * }}</pre> 263 * 264 * @author Doug Lea 265 * @param <T> The result type returned by this future's {@code join} 266 * and {@code get} methods 267 */ 268 class CompletableFuture(T) : 269 AbstractCompletableFuture, Future!(T), Promise!(T), CompletionStage!(T) { 270 271 // A CompletableFuture is also a Promise. 272 // https://www.eclipse.org/jetty/javadoc/9.4.8.v20171121/org/eclipse/jetty/util/Promise.Completable.html 273 274 static if(is(T == void)) { 275 276 alias ConsumerT = Action; 277 alias BiConsumerT = Action1!(Throwable); 278 alias FunctionT(V) = Func!(V); 279 280 this(bool completed = false) { 281 if(completed) completeValue!false(); 282 } 283 284 } else { 285 286 alias ConsumerT = Consumer!(T); 287 alias BiConsumerT = Action2!(T, Throwable); 288 alias FunctionT(V) = Func1!(T, V); 289 290 private T result; // Either the result or boxed AltResult 291 292 /** 293 * Creates a new complete CompletableFuture with given encoded result. 294 */ 295 this(T r) { 296 completeValue!false(r); 297 } 298 } 299 300 /** 301 * Creates a new incomplete CompletableFuture. 302 */ 303 this() { 304 } 305 306 this(AltResult r) { 307 completeValue!false(r); 308 } 309 310 /* 311 * Overview: 312 * 313 * A CompletableFuture may have dependent completion actions, 314 * collected in a linked stack. It atomically completes by CASing 315 * a result field, and then pops off and runs those actions. This 316 * applies across normal vs exceptional outcomes, sync vs async 317 * actions, binary triggers, and various forms of completions. 318 * 319 * Non-nullness of field "result" indicates done. It may 320 * be set directly if known to be thread-confined, else via CAS. 321 * An AltResult is used to box null as a result, as well as to 322 * hold exceptions. Using a single field makes completion simple 323 * to detect and trigger. Result encoding and decoding is 324 * straightforward but tedious and adds to the sprawl of trapping 325 * and associating exceptions with targets. Minor simplifications 326 * rely on (static) NIL (to box null results) being the only 327 * AltResult with a null exception field, so we don't usually need 328 * explicit comparisons. Even though some of the generics casts 329 * are unchecked (see SuppressWarnings annotations), they are 330 * placed to be appropriate even if checked. 331 * 332 * Dependent actions are represented by Completion objects linked 333 * as Treiber stacks headed by field "stack". There are Completion 334 * classes for each kind of action, grouped into: 335 * - single-input (UniCompletion), 336 * - two-input (BiCompletion), 337 * - projected (BiCompletions using exactly one of two inputs), 338 * - shared (CoCompletion, used by the second of two sources), 339 * - zero-input source actions, 340 * - Signallers that unblock waiters. 341 * class Completion : ForkJoinTask to enable async execution 342 * (adding no space overhead because we exploit its "tag" methods 343 * to maintain claims). It is also declared as Runnable to allow 344 * usage with arbitrary executors. 345 * 346 * Support for each kind of CompletionStage relies on a separate 347 * class, along with two CompletableFuture methods: 348 * 349 * * A Completion class with name X corresponding to function, 350 * prefaced with "Uni", "Bi", or "Or". Each class contains 351 * fields for source(s), actions, and dependent. They are 352 * boringly similar, differing from others only with respect to 353 * underlying functional forms. We do this so that users don't 354 * encounter layers of adapters in common usages. 355 * 356 * * Boolean CompletableFuture method x(...) (for example 357 * biApply) takes all of the arguments needed to check that an 358 * action is triggerable, and then either runs the action or 359 * arranges its async execution by executing its Completion 360 * argument, if present. The method returns true if known to be 361 * complete. 362 * 363 * * Completion method tryFire(int mode) invokes the associated x 364 * method with its held arguments, and on success cleans up. 365 * The mode argument allows tryFire to be called twice (SYNC, 366 * then ASYNC); the first to screen and trap exceptions while 367 * arranging to execute, and the second when called from a task. 368 * (A few classes are not used async so take slightly different 369 * forms.) The claim() callback suppresses function invocation 370 * if already claimed by another thread. 371 * 372 * * Some classes (for example UniApply) have separate handling 373 * code for when known to be thread-confined ("now" methods) and 374 * for when shared (in tryFire), for efficiency. 375 * 376 * * CompletableFuture method xStage(...) is called from a public 377 * stage method of CompletableFuture f. It screens user 378 * arguments and invokes and/or creates the stage object. If 379 * not async and already triggerable, the action is run 380 * immediately. Otherwise a Completion c is created, and 381 * submitted to the executor if triggerable, or pushed onto f's 382 * stack if not. Completion actions are started via c.tryFire. 383 * We recheck after pushing to a source future's stack to cover 384 * possible races if the source completes while pushing. 385 * Classes with two inputs (for example BiApply) deal with races 386 * across both while pushing actions. The second completion is 387 * a CoCompletion pointing to the first, shared so that at most 388 * one performs the action. The multiple-arity methods allOf 389 * does this pairwise to form trees of completions. Method 390 * anyOf is handled differently from allOf because completion of 391 * any source should trigger a cleanStack of other sources. 392 * Each AnyOf completion can reach others via a shared array. 393 * 394 * Note that the generic type parameters of methods vary according 395 * to whether "this" is a source, dependent, or completion. 396 * 397 * Method postComplete is called upon completion unless the target 398 * is guaranteed not to be observable (i.e., not yet returned or 399 * linked). Multiple threads can call postComplete, which 400 * atomically pops each dependent action, and tries to trigger it 401 * via method tryFire, in NESTED mode. Triggering can propagate 402 * recursively, so NESTED mode returns its completed dependent (if 403 * one exists) for further processing by its caller (see method 404 * postFire). 405 * 406 * Blocking methods get() and join() rely on Signaller Completions 407 * that wake up waiting threads. The mechanics are similar to 408 * Treiber stack wait-nodes used in FutureTask, Phaser, and 409 * SynchronousQueue. See their internal documentation for 410 * algorithmic details. 411 * 412 * Without precautions, CompletableFutures would be prone to 413 * garbage accumulation as chains of Completions build up, each 414 * pointing back to its sources. So we null out fields as soon as 415 * possible. The screening checks needed anyway harmlessly ignore 416 * null arguments that may have been obtained during races with 417 * threads nulling out fields. We also try to unlink non-isLive 418 * (fired or cancelled) Completions from stacks that might 419 * otherwise never be popped: Method cleanStack always unlinks non 420 * isLive completions from the head of stack; others may 421 * occasionally remain if racing with other cancellations or 422 * removals. 423 * 424 * Completion fields need not be declared as final or volatile 425 * because they are only visible to other threads upon safe 426 * publication. 427 */ 428 429 430 // final bool internalComplete(T r) { // CAS from null to r 431 // // return AtomicHelper.compareAndSet(this.result, null, r); 432 // if(AtomicHelper.compareAndSet(_isDone, false, true)) { 433 // this.result = r; 434 // _isNull = false; 435 // return true; 436 // } 437 // return false; 438 // } 439 440 /** Returns true if successfully pushed c onto stack. */ 441 final bool tryPushStack(Completion c) { 442 // info(typeid(c).name); 443 444 Completion h = stack; 445 446 AtomicHelper.store(c.next, h); // CAS piggyback 447 bool r = AtomicHelper.compareAndSet(this.stack, h, c); 448 // Completion x = this.stack; 449 // while(x !is null) { 450 // tracef("%s, Completion: %s", cast(Object*)this, typeid(x).name); 451 // x = x.next; 452 // } 453 return r; 454 } 455 456 /** Unconditionally pushes c onto stack, retrying if necessary. */ 457 final void pushStack(Completion c) { 458 do {} while (!tryPushStack(c)); 459 } 460 461 /* ------------- Encoding and decoding outcomes -------------- */ 462 463 /** Completes with the null value, unless already completed. */ 464 final bool completeNull(bool useCas = true)() { 465 // return AtomicHelper.compareAndSet(this.result, null, NIL); 466 static if(useCas) { 467 if(AtomicHelper.compareAndSet(_isDone, false, true)) { 468 _isNull = true; 469 altResult = NIL; 470 return true; 471 } 472 } else { 473 if(!_isDone) { 474 _isNull = true; 475 altResult = NIL; 476 _isDone = true; 477 return true; 478 } 479 } 480 481 return false; 482 } 483 484 /** Returns the encoding of the given non-exceptional value. */ 485 // final Object encodeValue(T t) { 486 // return (t is null) ? NIL : cast(Object)t; 487 // } 488 489 /** Completes with a non-exceptional result, unless already completed. */ 490 491 static if(is(T == void)) { 492 final bool completeValue(bool useCas = true)() { 493 // return AtomicHelper.compareAndSet(this.result, null, (t is null) ? NIL : cast(Object)t); 494 static if(useCas) { 495 if(AtomicHelper.compareAndSet(_isDone, false, true)) { 496 _isNull = false; 497 return true; 498 } 499 } 500 else { 501 if(!_isDone) { 502 _isDone = true; 503 _isNull = false; 504 return true; 505 } 506 } 507 return false; 508 } 509 } else { 510 final bool completeValue(bool useCas = true)(T t) { 511 // return AtomicHelper.compareAndSet(this.result, null, (t is null) ? NIL : cast(Object)t); 512 static if(useCas) { 513 if(AtomicHelper.compareAndSet(_isDone, false, true)) { 514 this.result = t; 515 _isNull = false; 516 return true; 517 } 518 } 519 else { 520 if(!_isDone) { 521 this.result = t; 522 _isDone = true; 523 _isNull = false; 524 return true; 525 } 526 } 527 return false; 528 } 529 } 530 531 final bool completeValue(bool useCas = true)(AltResult r) { 532 static if(useCas) { 533 if(AtomicHelper.compareAndSet(_isDone, false, true)) { 534 altResult = r; 535 return true; 536 } 537 } else { 538 if(!_isDone) { 539 _isDone = true; 540 altResult = r; 541 return true; 542 } 543 } 544 return false; 545 } 546 547 /** Completes with an exceptional result, unless already completed. */ 548 private final bool completeThrowable(bool useCas = true)(Throwable x) { 549 // return AtomicHelper.compareAndSet(this.result, null, encodeThrowable(x)); 550 static if(useCas) { 551 if(AtomicHelper.compareAndSet(_isDone, false, true)) { 552 altResult = encodeThrowable(x); 553 return true; 554 } 555 } else { 556 if(!_isDone) { 557 _isDone = true; 558 altResult = encodeThrowable(x); 559 return true; 560 } 561 } 562 return false; 563 } 564 565 566 /** 567 * Completes with the given (non-null) exceptional result as a 568 * wrapped CompletionException unless it is one already, unless 569 * already completed. May complete with the given Object r 570 * (which must have been the result of a source future) if it is 571 * equivalent, i.e. if this is a simple propagation of an 572 * existing CompletionException. 573 */ 574 private final bool completeThrowable(bool useCas = true)(Throwable x, AltResult r) { 575 // return AtomicHelper.compareAndSet(this.result, null, encodeThrowable(x, r)); 576 static if(useCas) { 577 if(AtomicHelper.compareAndSet(_isDone, false, true)) { 578 _isDone = true; 579 altResult = encodeThrowable(x, r); 580 return true; 581 } 582 } else { 583 if(!_isDone) { 584 _isDone = true; 585 altResult = encodeThrowable(x, r); 586 return true; 587 } 588 } 589 return false; 590 } 591 592 /** 593 * Returns the encoding of the given arguments: if the exception 594 * is non-null, encodes as AltResult. Otherwise uses the given 595 * value, boxed as NIL if null. 596 */ 597 // Object encodeOutcome(T t, Throwable x) { 598 // return (x is null) ? (t is null) ? NIL : cast(Object)t : encodeThrowable(x); 599 // } 600 601 602 /** 603 * Completes with r or a copy of r, unless already completed. 604 * If exceptional, r is first coerced to a CompletionException. 605 */ 606 // final bool completeRelay(T r, AltResult ar) { 607 // // return AtomicHelper.compareAndSet(this.result, null, encodeRelay(r)); 608 609 // if(AtomicHelper.compareAndSet(_isDone, false, true)) { 610 // // altResult = encodeThrowable(x, r); 611 // // this.result = encodeRelay(r); 612 // implementationMissing(false); 613 // return true; 614 // } 615 // return false; 616 // } 617 618 619 /* ------------- Base Completion classes and operations -------------- */ 620 621 622 /** 623 * Pops and tries to trigger all reachable dependents. Call only 624 * when known to be done. 625 */ 626 protected final override void postComplete() { 627 /* 628 * On each step, variable f holds current dependents to pop 629 * and run. It is extended along only one path at a time, 630 * pushing others to avoid unbounded recursion. 631 */ 632 633 AbstractCompletableFuture f = this; Completion h; 634 while ((h = f.stack) !is null || 635 (f !is this && (h = (f = this).stack) !is null)) { 636 AbstractCompletableFuture d; Completion t; 637 t = h.next; 638 // infof("this: %s, h: %s", cast(Object*)this, typeid(h).name); 639 640 if(AtomicHelper.compareAndSet(f.stack, h, t)) { 641 if (t !is null) { 642 if (f !is this) { 643 pushStack(h); 644 continue; 645 } 646 AtomicHelper.compareAndSet(h.next, t, null); // try to detach 647 } 648 // infof("Completion: %s, this: %s", typeid(h).name, typeid(this).name); 649 d = h.tryFire(NESTED); 650 f = (d is null) ? this : d; 651 } 652 } 653 } 654 655 /** Traverses stack and unlinks one or more dead Completions, if found. */ 656 protected final override void cleanStack() { 657 Completion p = stack; 658 // ensure head of stack live 659 for (bool unlinked = false;;) { 660 if (p is null) 661 return; 662 else if (p.isLive()) { 663 if (unlinked) 664 return; 665 else 666 break; 667 } 668 else if (AtomicHelper.compareAndSet(this.stack, p, (p = p.next))) 669 unlinked = true; 670 else 671 p = stack; 672 } 673 // try to unlink first non-live 674 for (Completion q = p.next; q !is null;) { 675 Completion s = q.next; 676 if (q.isLive()) { 677 p = q; 678 q = s; 679 } else if (AtomicHelper.compareAndSet(p.next, q, s)) 680 break; 681 else 682 q = p.next; 683 } 684 } 685 686 /* ------------- One-input Completions -------------- */ 687 688 /** 689 * Pushes the given completion unless it completes while trying. 690 * Caller should first check that result is null. 691 */ 692 protected final override void unipush(Completion c) { 693 if (c !is null) { 694 while (!tryPushStack(c)) { 695 if (_isDone) { 696 AtomicHelper.store(c.next, null); 697 break; 698 } 699 } 700 if (_isDone) 701 c.tryFire(SYNC); 702 } 703 } 704 705 /** 706 * Post-processing by dependent after successful UniCompletion tryFire. 707 * Tries to clean stack of source a, and then either runs postComplete 708 * or returns this to caller, depending on mode. 709 */ 710 private final CompletableFuture!(T) postFire(AbstractCompletableFuture a, int mode) { 711 712 // infof("this: %s, h: %s", cast(Object*)this, typeid(this.stack).name); 713 714 if (a !is null && a.stack !is null) { 715 bool done = a._isDone; 716 if (!done) 717 a.cleanStack(); 718 if (mode >= 0 && (done || a._isDone)) 719 a.postComplete(); 720 } 721 722 // if(stack is null) 723 // infof("this: %s, mode=%d, result: %s, stack: null", cast(Object*)this, mode, result is null); 724 // else 725 // infof("this: %s, mode=%d, result: %s, stack: %s", cast(Object*)this, mode, result is null, typeid(this.stack).name); 726 727 if (_isDone && stack !is null) { 728 if (mode < 0) 729 return this; 730 else 731 postComplete(); 732 } 733 return null; 734 } 735 736 private CompletableFuture!(V) uniApplyStage(V)(Executor e, FunctionT!(V) f) { 737 if (f is null) throw new NullPointerException(); 738 if (_isDone) { 739 return uniApplyNow!(V)(e, f); 740 } 741 CompletableFuture!(V) d = newIncompleteFuture!(V)(); 742 unipush(new UniApply!(T,V)(e, d, this, f)); 743 return d; 744 } 745 746 747 private CompletableFuture!(V) uniApplyNow(V)(Executor e, FunctionT!(V) f) { 748 749 CompletableFuture!(V) d = newIncompleteFuture!(V)(); 750 751 AltResult ar = this.altResult; 752 if(ar !is null) { 753 Throwable x = ar.ex; 754 if (x !is null) { 755 d.completeThrowable!false(x, ar); 756 return d; 757 } 758 } 759 760 try { 761 if (e !is null) { 762 e.execute(new UniApply!(T, V)(null, d, this, f)); 763 } else { 764 765 static if(is(T == void)) { 766 d.completeValue!false(f()); 767 } else { 768 d.completeValue!false(f(this.result)); 769 } 770 } 771 } catch (Throwable ex) { 772 d.completeThrowable!false(ex); 773 } 774 return d; 775 } 776 777 778 private CompletableFuture!(void) uniAcceptStage(Executor e, 779 ConsumerT f) { 780 if (f is null) throw new NullPointerException(); 781 if (isDone) 782 return uniAcceptNow(e, f); 783 CompletableFuture!(void) d = newIncompleteFuture!(void)(); 784 unipush(new UniAccept!(T)(e, d, this, f)); 785 return d; 786 } 787 788 private CompletableFuture!(void) uniAcceptNow(Executor e, ConsumerT f) { 789 Throwable x; 790 CompletableFuture!(void) d = newIncompleteFuture!(void)(); 791 AltResult ar = altResult; 792 if (ar !is null) { 793 if ((x = ar.ex) !is null) { 794 d.completeValue!false(encodeThrowable(x, ar)); 795 return d; 796 } 797 // r = null; 798 } 799 800 try { 801 if (e !is null) { 802 e.execute(new UniAccept!(T)(null, d, this, f)); 803 } else { 804 805 static if(is(T == void)) { 806 f(); 807 } else { 808 T t = this.result; 809 f(t); 810 } 811 d.completeValue!false(NIL); 812 } 813 } catch (Throwable ex) { 814 d.completeThrowable!false(ex); 815 } 816 return d; 817 } 818 819 820 private CompletableFuture!(void) uniRunStage(Executor e, Runnable f) { 821 if (f is null) throw new NullPointerException(); 822 if (!isDone()) 823 return uniRunNow(altResult, e, f); 824 CompletableFuture!(void) d = newIncompleteFuture!(void)(); 825 unipush(new UniRun!(T)(e, d, this, f)); 826 return d; 827 } 828 829 private CompletableFuture!(void) uniRunNow(AltResult ar, Executor e, Runnable f) { 830 Throwable x; 831 CompletableFuture!(void) d = newIncompleteFuture!(void)(); 832 if (ar !is null && (x = ar.ex) !is null) 833 d.completeValue!false(encodeThrowable(x, ar)); 834 else 835 try { 836 if (e !is null) { 837 e.execute(new UniRun!(T)(null, d, this, f)); 838 } else { 839 f.run(); 840 d.completeNull(); 841 } 842 } catch (Throwable ex) { 843 d.completeThrowable!false(ex); 844 } 845 return d; 846 } 847 848 849 final bool uniWhenComplete(CompletableFuture!(T) r, 850 BiConsumerT f, 851 UniWhenComplete!(T) c) { 852 Throwable x = null; 853 if (!_isDone) { 854 try { 855 if (c !is null && !c.claim()) 856 return false; 857 AltResult ar = r.altResult; 858 if (ar !is null) { 859 x = ar.ex; 860 warning("Need to check"); 861 862 static if(is(T == void)) { 863 f(x); 864 } else { 865 f(T.init, x); 866 } 867 if (x is null) { 868 completeValue(ar); 869 } 870 } else { 871 872 static if(is(T == void)) { 873 f(x); 874 completeValue(); 875 } else { 876 T t = r.result; 877 f(t, x); 878 completeValue(t); 879 } 880 } 881 882 if (x is null) { 883 return true; 884 } 885 } catch (Throwable ex) { 886 if (x is null) 887 x = ex; 888 else if (x !is ex) 889 x.next = ex; 890 } 891 892 completeThrowable(x, r.altResult); 893 } 894 return true; 895 } 896 897 private CompletableFuture!(T) uniWhenCompleteStage( 898 Executor e, BiConsumerT f) { 899 if (f is null) throw new NullPointerException(); 900 CompletableFuture!(T) d = newIncompleteFuture!(T)(); 901 Object r; 902 if (!isDone) 903 unipush(new UniWhenComplete!(T)(e, d, this, f)); 904 else if (e is null) 905 d.uniWhenComplete(this, f, null); 906 else { 907 try { 908 e.execute(new UniWhenComplete!(T)(null, d, this, f)); 909 } catch (Throwable ex) { 910 d.completeThrowable!false(ex); 911 } 912 } 913 return d; 914 } 915 916 final bool uniHandle(S)(CompletableFuture!(S) r, BiFunction!(S, Throwable, T) f, 917 UniHandle!(S, T) c) { 918 Throwable x; 919 if (!isDone()) { 920 try { 921 if (c !is null && !c.claim()) 922 return false; 923 S s; 924 AltResult ar = r.altResult; 925 if (ar !is null) { 926 x = ar.ex; 927 static if(is(S == class) || is(S == interface)) { 928 s = null; 929 } else { 930 warning("to check"); 931 s = S.init; 932 } 933 } else { 934 x = null; 935 s = r.result; 936 } 937 completeValue(f(s, x)); 938 } catch (Throwable ex) { 939 completeThrowable(ex); 940 } 941 } 942 return true; 943 } 944 945 private CompletableFuture!(V) uniHandleStage(V)(Executor e, 946 BiFunction!(T, Throwable, V) f) { 947 948 if (f is null) throw new NullPointerException(); 949 CompletableFuture!(V) d = newIncompleteFuture!(V)(); 950 951 if (!isDone()) 952 unipush(new UniHandle!(T,V)(e, d, this, f)); 953 else if (e is null) 954 d.uniHandle!(T)(this, f, null); 955 else { 956 try { 957 e.execute(new UniHandle!(T,V)(null, d, this, f)); 958 } catch (Throwable ex) { 959 d.completeThrowable!false(ex); 960 } 961 } 962 return d; 963 } 964 965 966 final bool uniExceptionally(CompletableFuture!(T) r, 967 Function!(Throwable, T) f, 968 UniExceptionally!(T) c) { 969 Throwable x; 970 if (!isDone()) { 971 try { 972 AltResult ar = r.altResult; 973 if (ar !is null && (x = ar.ex) !is null) { 974 if (c !is null && !c.claim()) 975 return false; 976 977 static if(is(T == void)) { 978 f(x); 979 completeValue(); 980 } else { 981 completeValue(f(x)); 982 } 983 } else { 984 985 static if(is(T == void)) { 986 completeValue(); 987 } else { 988 completeValue(r.result); 989 } 990 } 991 } catch (Throwable ex) { 992 completeThrowable(ex); 993 } 994 } 995 return true; 996 } 997 998 private CompletableFuture!(T) uniExceptionallyStage(Function!(Throwable, T) f) { 999 if (f is null) throw new NullPointerException(); 1000 CompletableFuture!(T) d = newIncompleteFuture!(T)(); 1001 if (!isDone()) 1002 unipush(new UniExceptionally!(T)(d, this, f)); 1003 else 1004 d.uniExceptionally(this, f, null); 1005 return d; 1006 } 1007 1008 private MinimalStage!(T) uniAsMinimalStage() { 1009 if (isDone()) { 1010 if(isFaulted()) 1011 return new MinimalStage!(T)(this.altResult); 1012 else { 1013 1014 static if(is(T == void)) { 1015 return new MinimalStage!(T)(true); 1016 } else { 1017 return new MinimalStage!(T)(this.result); 1018 } 1019 } 1020 } 1021 MinimalStage!(T) d = new MinimalStage!(T)(); 1022 unipush(new UniRelay!(T,T)(d, this)); 1023 return d; 1024 } 1025 1026 private CompletableFuture!(V) uniComposeStage(V)(Executor e, 1027 FunctionT!(CompletionStage!(V)) f) { 1028 1029 if (f is null) throw new NullPointerException(); 1030 CompletableFuture!(V) d = newIncompleteFuture!(V)(); 1031 Throwable x; 1032 if (!isDone()) 1033 unipush(new UniCompose!(T,V)(e, d, this, f)); 1034 else if (e is null) { 1035 T t = this.result; 1036 AltResult ar = this.altResult; 1037 if (ar !is null) { 1038 if ((x = ar.ex) !is null) { 1039 d.completeThrowable!false(x, ar); 1040 return d; 1041 } 1042 warning("To check"); 1043 t = T.init; 1044 } 1045 1046 try { 1047 auto gg = f(t).toCompletableFuture(); 1048 CompletableFuture!(V) g = cast(CompletableFuture!(V))gg; 1049 if(g is null && gg !is null) { 1050 warningf("bad cast"); 1051 } 1052 1053 if (g.isDone()) 1054 d.completeValue!false(g.result); 1055 else { 1056 g.unipush(new UniRelay!(V,V)(d, g)); 1057 } 1058 } catch (Throwable ex) { 1059 d.completeThrowable!false(ex); 1060 } 1061 } 1062 else 1063 try { 1064 e.execute(new UniCompose!(T,V)(null, d, this, f)); 1065 } catch (Throwable ex) { 1066 d.completeThrowable!false(ex); 1067 } 1068 return d; 1069 } 1070 1071 /* ------------- Two-input Completions -------------- */ 1072 1073 /** A Completion for an action with two sources */ 1074 1075 /** 1076 * Pushes completion to this and b unless both done. 1077 * Caller should first check that either result or b.result is null. 1078 */ 1079 final override void bipush(AbstractCompletableFuture b, BiCompletion c) { 1080 if (c !is null) { 1081 while (!_isDone) { 1082 if (tryPushStack(c)) { 1083 if (!b._isDone) 1084 b.unipush(new CoCompletion(c)); 1085 else if (_isDone) 1086 c.tryFire(SYNC); 1087 return; 1088 } 1089 } 1090 b.unipush(c); 1091 } 1092 } 1093 1094 /** Post-processing after successful BiCompletion tryFire. */ 1095 final CompletableFuture!(T) postFire(AbstractCompletableFuture a, 1096 AbstractCompletableFuture b, int mode) { 1097 if (b !is null && b.stack !is null) { // clean second source 1098 if (!b.isDone()) 1099 b.cleanStack(); 1100 if (mode >= 0 && b.isDone()) 1101 b.postComplete(); 1102 } 1103 return postFire(a, mode); 1104 } 1105 1106 1107 final bool biApply(R, S)(CompletableFuture!R r, CompletableFuture!S s, 1108 BiFunction!(R, S, T) f, 1109 BiApply!(R,S,T) c) { 1110 Throwable x; 1111 1112 AltResult ar = r.altResult; 1113 AltResult ars = s.altResult; 1114 R rr = r.result; 1115 S ss = s.result; 1116 1117 tryComplete: if (!isDone) { 1118 if (ar !is null) { 1119 if ((x = ar.ex) !is null) { 1120 completeThrowable(x, ar); 1121 goto tryComplete; 1122 } 1123 warning("To check"); 1124 rr = R.init; 1125 } 1126 if (ars !is null) { 1127 if ((x = ars.ex) !is null) { 1128 completeThrowable(x, ars); 1129 goto tryComplete; 1130 } 1131 warning("To check"); 1132 ss = S.init; 1133 } 1134 1135 try { 1136 if (c !is null && !c.claim()) 1137 return false; 1138 completeValue(f(rr, ss)); 1139 } catch (Throwable ex) { 1140 completeThrowable(ex); 1141 } 1142 } 1143 return true; 1144 } 1145 1146 private CompletableFuture!(V) biApplyStage(U,V)( 1147 Executor e, CompletionStage!(U) o, BiFunction!(T, U, V) f) { 1148 1149 if (f is null) 1150 throw new NullPointerException(); 1151 CompletableFuture!(U) b = cast(CompletableFuture!(U))o.toCompletableFuture(); 1152 if (b is null) 1153 throw new NullPointerException(); 1154 1155 CompletableFuture!(V) d = newIncompleteFuture!(V)(); 1156 1157 if (!isDone() || !b.isDone()) 1158 bipush(b, new BiApply!(T,U,V)(e, d, this, b, f)); 1159 else if (e is null) 1160 d.biApply!(T, U)(this, b, f, null); 1161 else 1162 try { 1163 e.execute(new BiApply!(T,U,V)(null, d, this, b, f)); 1164 } catch (Throwable ex) { 1165 d.completeThrowable!false(ex); 1166 } 1167 return d; 1168 } 1169 1170 1171 final bool biAccept(R, S)(CompletableFuture!R r, CompletableFuture!S s, 1172 BiConsumer!(R,S) f, 1173 BiAccept!(R, S) c) { 1174 Throwable x; 1175 1176 AltResult ar = r.altResult; 1177 AltResult ars = s.altResult; 1178 R rr = r.result; 1179 S ss = s.result; 1180 1181 tryComplete: if (!isDone()) { 1182 if (ar !is null) { 1183 if ((x = ar.ex) !is null) { 1184 completeThrowable(x, ar); 1185 goto tryComplete; 1186 } 1187 warning("To check"); 1188 rr = R.init; 1189 } 1190 if (ars !is null) { 1191 if ((x = ars.ex) !is null) { 1192 completeThrowable(x, ars); 1193 goto tryComplete; 1194 } 1195 warning("To check"); 1196 ss = S.init; 1197 } 1198 1199 try { 1200 if (c !is null && !c.claim()) 1201 return false; 1202 f(rr, ss); 1203 completeNull(); 1204 } catch (Throwable ex) { 1205 completeThrowable(ex); 1206 } 1207 } 1208 return true; 1209 } 1210 1211 private CompletableFuture!(void) biAcceptStage(U)(Executor e, 1212 CompletionStage!(U) o, BiConsumer!(T, U) f) { 1213 1214 if (f is null) 1215 throw new NullPointerException(); 1216 1217 CompletableFuture!(U) b = cast(CompletableFuture!(U))o.toCompletableFuture(); 1218 if (b is null) 1219 throw new NullPointerException(); 1220 1221 CompletableFuture!(void) d = newIncompleteFuture!(void)(); 1222 if (!isDone() || !b.isDone()) 1223 bipush(b, new BiAccept!(T,U)(e, d, this, b, f)); 1224 else if (e is null) 1225 d.biAccept!(T, U)(this, b, f, null); 1226 else 1227 try { 1228 e.execute(new BiAccept!(T,U)(null, d, this, b, f)); 1229 } catch (Throwable ex) { 1230 d.completeThrowable!false(ex); 1231 } 1232 return d; 1233 } 1234 1235 1236 final bool biRun(AbstractCompletableFuture r, AbstractCompletableFuture s, 1237 Runnable f, IUniCompletion c) { 1238 Throwable x; 1239 if (!isDone()) { 1240 AltResult ar = r.altResult; 1241 AltResult ars = s.altResult; 1242 if(ar !is null && (x = ar.ex) !is null){ 1243 completeThrowable(x, ar); 1244 } else if(ars !is null && (x = ars.ex) !is null){ 1245 completeThrowable(x, ars); 1246 } else { 1247 try { 1248 if (c !is null && !c.claim()) 1249 return false; 1250 f.run(); 1251 completeNull(); 1252 } catch (Throwable ex) { 1253 completeThrowable(ex); 1254 } 1255 } 1256 } 1257 return true; 1258 } 1259 1260 private CompletableFuture!(void) biRunStage(U)(Executor e, CompletionStage!U o, 1261 Runnable f) { 1262 CompletableFuture!U b = cast(CompletableFuture!U)o.toCompletableFuture(); 1263 if (f is null || b is null) 1264 throw new NullPointerException(); 1265 CompletableFuture!(void) d = newIncompleteFuture!(void)(); 1266 // if ((r = result) is null || (s = b.result) is null) 1267 if (!isDone() || !b.isDone()) 1268 bipush(b, new BiRun!(T, U)(e, d, this, b, f)); 1269 else if (e is null) 1270 d.biRun(this, b, f, null); 1271 else 1272 try { 1273 e.execute(new BiRun!(T, U)(null, d, this, b, f)); 1274 } catch (Throwable ex) { 1275 d.completeThrowable!false(ex); 1276 } 1277 return d; 1278 } 1279 1280 1281 /* ------------- Projected (Ored) BiCompletions -------------- */ 1282 1283 /** 1284 * Pushes completion to this and b unless either done. 1285 * Caller should first check that result and b.result are both null. 1286 */ 1287 final void orpush(AbstractCompletableFuture b, BiCompletion c) { 1288 if (c !is null) { 1289 while (!tryPushStack(c)) { 1290 if (_isDone) { 1291 AtomicHelper.store(c.next, null); 1292 break; 1293 } 1294 } 1295 if (_isDone) 1296 c.tryFire(SYNC); 1297 else 1298 b.unipush(new CoCompletion(c)); 1299 } 1300 } 1301 1302 private CompletableFuture!(V) orApplyStage(U, V)( // U : T,V 1303 Executor e, CompletionStage!(U) o, FunctionT!(V) f) { 1304 1305 CompletableFuture!(U) b = cast(CompletableFuture!(U))o.toCompletableFuture(); 1306 if (f is null || b is null) 1307 throw new NullPointerException(); 1308 1309 if(this._isDone) 1310 return uniApplyNow!(V)(e, f); 1311 else if(b._isDone) { 1312 return b.uniApplyNow!(V)(e, f); 1313 } 1314 // T r; CompletableFuture!T z; 1315 // if ((r = (z = this).result) !is null || 1316 // (r = (z = b).result) !is null) 1317 // return z.uniApplyNow!(V)(e, f); 1318 1319 CompletableFuture!(V) d = newIncompleteFuture!(V)(); 1320 orpush(b, new OrApply!(T,U,V)(e, d, this, b, f)); 1321 return d; 1322 } 1323 1324 1325 private CompletableFuture!(void) orAcceptStage(U)( // U : T 1326 Executor e, CompletionStage!(U) o, ConsumerT f) { 1327 CompletableFuture!(U) b; 1328 if (f is null || (b = cast(CompletableFuture!(U))o) is null) 1329 throw new NullPointerException(); 1330 1331 CompletableFuture!T z; 1332 if ((z = this).isDone() || (z = b).isDone()) 1333 return z.uniAcceptNow(e, f); 1334 1335 CompletableFuture!(void) d = newIncompleteFuture!(void)(); 1336 orpush(b, new OrAccept!(T,U)(e, d, this, b, f)); 1337 return d; 1338 } 1339 1340 private CompletableFuture!(void) orRunStage(U)(Executor e, CompletionStage!U o, 1341 Runnable f) { 1342 AbstractCompletableFuture b; 1343 if (f is null || (b = o.toCompletableFuture()) is null) 1344 throw new NullPointerException(); 1345 1346 AbstractCompletableFuture z; 1347 if ((z = this).isDone() || (z = b).isDone()) 1348 return z.uniRunNow(e, f); 1349 1350 CompletableFuture!(void) d = newIncompleteFuture!(void)(); 1351 orpush(b, new OrRun(e, d, this, b, f)); 1352 return d; 1353 } 1354 1355 1356 /* ------------- Signallers -------------- */ 1357 1358 /** 1359 * Returns raw result after waiting, or null if interruptible and 1360 * interrupted. 1361 */ 1362 private void waitingGet(bool interruptible) { 1363 Signaller q = null; 1364 bool queued = false; 1365 while (!isDone()) { 1366 if (q is null) { 1367 q = new Signaller(interruptible, Duration.zero, MonoTime.zero); 1368 ForkJoinWorkerThread th = cast(ForkJoinWorkerThread)Thread.getThis(); 1369 if (th !is null) 1370 ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q); 1371 } 1372 else if (!queued) { 1373 queued = tryPushStack(q); 1374 } 1375 else { 1376 try { 1377 ForkJoinPool.managedBlock(q); 1378 } catch (InterruptedException ie) { // currently cannot happen 1379 q.interrupted = true; 1380 } 1381 if (q.interrupted && interruptible) 1382 break; 1383 } 1384 } 1385 if (q !is null && queued) { 1386 q.thread = null; 1387 if (!interruptible && q.interrupted) 1388 ThreadEx.currentThread().interrupt(); 1389 if (!isDone()) 1390 cleanStack(); 1391 } 1392 if (isDone()) 1393 postComplete(); 1394 } 1395 1396 /** 1397 * Returns raw result after waiting, or null if interrupted, or 1398 * throws TimeoutException on timeout. 1399 */ 1400 private void timedGet(Duration timeout) { 1401 if (ThreadEx.interrupted()) 1402 return; 1403 1404 if (timeout <= Duration.zero) 1405 throw new TimeoutException(); 1406 1407 MonoTime d = MonoTime.currTime + timeout; 1408 MonoTime deadline = (d == MonoTime.zero) ? MonoTime(1) : d; // avoid 0 1409 Signaller q = null; 1410 bool queued = false; 1411 while (!isDone()) { // similar to untimed 1412 if (q is null) { 1413 q = new Signaller(true, timeout, deadline); 1414 ForkJoinWorkerThread th = cast(ForkJoinWorkerThread)ThreadEx.currentThread(); 1415 if (th !is null) 1416 ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q); 1417 } 1418 else if (!queued) 1419 queued = tryPushStack(q); 1420 else if (q.remaining <= Duration.zero) 1421 break; 1422 else { 1423 try { 1424 ForkJoinPool.managedBlock(q); 1425 } catch (InterruptedException ie) { 1426 q.interrupted = true; 1427 } 1428 if (q.interrupted) 1429 break; 1430 } 1431 } 1432 1433 bool r = isDone(); 1434 if (q !is null && queued) { 1435 q.thread = null; 1436 if (!r) 1437 cleanStack(); 1438 } 1439 if (r) 1440 postComplete(); 1441 if (r || (q !is null && q.interrupted)) 1442 return ; 1443 1444 throw new TimeoutException(); 1445 } 1446 1447 /* ------------- methods -------------- */ 1448 1449 static if(is(T == void)) { 1450 void get() { 1451 if (!isDone()) waitingGet(true); { 1452 reportGet(this.altResult); 1453 } 1454 } 1455 1456 void getNow() { 1457 if(isDone()) { 1458 reportJoin(this.altResult); 1459 } 1460 } 1461 1462 bool complete() { 1463 bool triggered = completeValue(); 1464 postComplete(); 1465 return triggered; 1466 } 1467 1468 /** 1469 * Callback invoked when the operation completes. 1470 */ 1471 void succeeded() { 1472 complete(); 1473 } 1474 1475 void get(Duration timeout) { 1476 if (!isDone()) timedGet(timeout); 1477 reportGet(this.altResult); 1478 } 1479 1480 void join() { 1481 if (!isDone()) waitingGet(false); 1482 reportJoin(this.altResult); 1483 } 1484 1485 } else { 1486 1487 /** 1488 * Waits if necessary for this future to complete, and then 1489 * returns its result. 1490 * 1491 * @return the result value 1492 * @throws CancellationException if this future was cancelled 1493 * @throws ExecutionException if this future completed exceptionally 1494 * @throws InterruptedException if the current thread was interrupted 1495 * while waiting 1496 */ 1497 T get() { 1498 if (!isDone()) waitingGet(true); 1499 return reportGet!(T)(this.result, this.altResult); 1500 } 1501 1502 /** 1503 * Waits if necessary for at most the given time for this future 1504 * to complete, and then returns its result, if available. 1505 * 1506 * @param timeout the maximum time to wait 1507 * @param unit the time unit of the timeout argument 1508 * @return the result value 1509 * @throws CancellationException if this future was cancelled 1510 * @throws ExecutionException if this future completed exceptionally 1511 * @throws InterruptedException if the current thread was interrupted 1512 * while waiting 1513 * @throws TimeoutException if the wait timed out 1514 */ 1515 T get(Duration timeout) { 1516 if (!isDone()) timedGet(timeout); 1517 return reportGet!T(this.result, this.altResult); 1518 } 1519 1520 1521 /** 1522 * Returns the result value when complete, or throws an 1523 * (unchecked) exception if completed exceptionally. To better 1524 * conform with the use of common functional forms, if a 1525 * computation involved in the completion of this 1526 * CompletableFuture threw an exception, this method throws an 1527 * (unchecked) {@link CompletionException} with the underlying 1528 * exception as its cause. 1529 * 1530 * @return the result value 1531 * @throws CancellationException if the computation was cancelled 1532 * @throws CompletionException if this future completed 1533 * exceptionally or a completion computation threw an exception 1534 */ 1535 T join() { 1536 if (!isDone()) waitingGet(false); 1537 return reportJoin!T(this.result, this.altResult); 1538 } 1539 1540 /** 1541 * Returns the result value (or throws any encountered exception) 1542 * if completed, else returns the given valueIfAbsent. 1543 * 1544 * @param valueIfAbsent the value to return if not completed 1545 * @return the result value, if completed, else the given valueIfAbsent 1546 * @throws CancellationException if the computation was cancelled 1547 * @throws CompletionException if this future completed 1548 * exceptionally or a completion computation threw an exception 1549 */ 1550 T getNow(T valueIfAbsent) { 1551 return (!isDone()) 1552 ? valueIfAbsent 1553 : reportJoin!T(this.result, this.altResult); 1554 } 1555 /** 1556 * If not already completed, sets the value returned by {@link 1557 * #get()} and related methods to the given value. 1558 * 1559 * @param value the result value 1560 * @return {@code true} if this invocation caused this CompletableFuture 1561 * to transition to a completed state, else {@code false} 1562 */ 1563 bool complete(T value) { 1564 bool triggered = completeValue(value); 1565 postComplete(); 1566 return triggered; 1567 } 1568 1569 /** 1570 * Callback invoked when the operation completes. 1571 */ 1572 void succeeded(T result) { 1573 complete(result); 1574 } 1575 } 1576 1577 /** 1578 * If not already completed, causes invocations of {@link #get()} 1579 * and related methods to throw the given exception. 1580 * 1581 * @param ex the exception 1582 * @return {@code true} if this invocation caused this CompletableFuture 1583 * to transition to a completed state, else {@code false} 1584 */ 1585 override bool completeExceptionally(Throwable ex) { 1586 if (ex is null) throw new NullPointerException(); 1587 bool triggered = completeValue(new AltResult(ex)); 1588 postComplete(); 1589 return triggered; 1590 } 1591 1592 /** 1593 * Callback invoked when the operation fails. 1594 */ 1595 void failed(Exception x) { 1596 completeExceptionally(x); 1597 } 1598 1599 1600 /** 1601 */ 1602 CompletableFuture!(U) thenApply(U)(FunctionT!(U) fn) { 1603 return uniApplyStage!(U)(cast(Executor)null, fn); 1604 } 1605 1606 CompletableFuture!(U) thenApplyAsync(U)(FunctionT!(U) fn) { 1607 return uniApplyStage!(U)(defaultExecutor(), fn); 1608 } 1609 1610 CompletableFuture!(U) thenApplyAsync(U)(FunctionT!(U) fn, Executor executor) { 1611 return uniApplyStage!(U)(screenExecutor(executor), fn); 1612 } 1613 1614 CompletableFuture!(void) thenAccept(ConsumerT action) { 1615 return uniAcceptStage(cast(Executor)null, action); 1616 } 1617 1618 CompletableFuture!(void) thenAcceptAsync(ConsumerT action) { 1619 return uniAcceptStage(defaultExecutor(), action); 1620 } 1621 1622 CompletableFuture!(void) thenAcceptAsync(ConsumerT action, 1623 Executor executor) { 1624 return uniAcceptStage(screenExecutor(executor), action); 1625 } 1626 1627 CompletableFuture!(void) thenRun(Runnable action) { 1628 return uniRunStage(null, action); 1629 } 1630 1631 CompletableFuture!(void) thenRunAsync(Runnable action) { 1632 return uniRunStage(defaultExecutor(), action); 1633 } 1634 1635 CompletableFuture!(void) thenRunAsync(Runnable action, 1636 Executor executor) { 1637 return uniRunStage(screenExecutor(executor), action); 1638 } 1639 1640 CompletableFuture!(V) thenCombine(U, V)( 1641 CompletionStage!(U) other, 1642 BiFunction!(T, U, V) fn) { 1643 return biApplyStage!(U, V)(null, other, fn); 1644 } 1645 1646 CompletableFuture!(V) thenCombineAsync(U, V)( 1647 CompletionStage!(U) other, 1648 BiFunction!(T, U, V) fn) { 1649 return biApplyStage!(U, V)(defaultExecutor(), other, fn); 1650 } 1651 1652 CompletableFuture!(V) thenCombineAsync(U, V)( 1653 CompletionStage!(U) other, 1654 BiFunction!(T, U, V) fn, Executor executor) { 1655 return biApplyStage!(U, V)(screenExecutor(executor), other, fn); 1656 } 1657 1658 CompletableFuture!(void) thenAcceptBoth(U)(CompletionStage!(U) other, 1659 BiConsumer!(T, U) action) { 1660 return biAcceptStage!(U)(null, other, action); 1661 } 1662 1663 CompletableFuture!(void) thenAcceptBothAsync(U)( 1664 CompletionStage!(U) other, 1665 BiConsumer!(T, U) action) { 1666 return biAcceptStage!(U)(defaultExecutor(), other, action); 1667 } 1668 1669 CompletableFuture!(void) thenAcceptBothAsync(U)( 1670 CompletionStage!(U) other, 1671 BiConsumer!(T, U) action, Executor executor) { 1672 return biAcceptStage!(U)(screenExecutor(executor), other, action); 1673 } 1674 1675 CompletableFuture!(void) runAfterBoth(U)(CompletionStage!U other, 1676 Action action) { 1677 return biRunStage(null, other, new class Runnable { 1678 void run() { 1679 action(); 1680 } 1681 } ); 1682 } 1683 1684 CompletableFuture!(void) runAfterBoth(U)(CompletionStage!U other, 1685 Runnable action) { 1686 return biRunStage(null, other, action); 1687 } 1688 1689 CompletableFuture!(void) runAfterBothAsync(U)(CompletionStage!U other, 1690 Runnable action) { 1691 return biRunStage(defaultExecutor(), other, action); 1692 } 1693 1694 CompletableFuture!(void) runAfterBothAsync(U)(CompletionStage!U other, 1695 Runnable action, 1696 Executor executor) { 1697 return biRunStage(screenExecutor(executor), other, action); 1698 } 1699 1700 CompletableFuture!(U) applyToEither(U)( 1701 CompletionStage!(T) other, FunctionT!(U) fn) { 1702 return orApplyStage!(T, U)(null, other, fn); 1703 } 1704 1705 CompletableFuture!(U) applyToEitherAsync(U)( 1706 CompletionStage!(T) other, FunctionT!(U) fn) { 1707 return orApplyStage!(T, U)(defaultExecutor(), other, fn); 1708 } 1709 1710 CompletableFuture!(U) applyToEitherAsync(U)( 1711 CompletionStage!(T) other, FunctionT!(U) fn, 1712 Executor executor) { 1713 return orApplyStage!(T, U)(screenExecutor(executor), other, fn); 1714 } 1715 1716 CompletableFuture!(void) acceptEither( 1717 CompletionStage!(T) other, ConsumerT action) { 1718 return orAcceptStage(null, other, action); 1719 } 1720 1721 CompletableFuture!(void) acceptEitherAsync( 1722 CompletionStage!(T) other, ConsumerT action) { 1723 return orAcceptStage(defaultExecutor(), other, action); 1724 } 1725 1726 CompletableFuture!(void) acceptEitherAsync( 1727 CompletionStage!(T) other, ConsumerT action, 1728 Executor executor) { 1729 return orAcceptStage(screenExecutor(executor), other, action); 1730 } 1731 1732 CompletableFuture!(void) runAfterEither(U)(CompletionStag!U other, 1733 Runnable action) { 1734 return orRunStage(null, other, action); 1735 } 1736 1737 CompletableFuture!(void) runAfterEitherAsync(U)(CompletionStage!U other, 1738 Runnable action) { 1739 return orRunStage(defaultExecutor(), other, action); 1740 } 1741 1742 CompletableFuture!(void) runAfterEitherAsync(U)(CompletionStage!U other, 1743 Runnable action, 1744 Executor executor) { 1745 return orRunStage(screenExecutor(executor), other, action); 1746 } 1747 1748 CompletableFuture!(U) thenCompose(U)( 1749 FunctionT!(CompletionStage!(U)) fn) { 1750 return uniComposeStage!(U)(null, fn); 1751 } 1752 1753 CompletableFuture!(U) thenComposeAsync(U)( 1754 FunctionT!(CompletionStage!(U)) fn) { 1755 return uniComposeStage!(U)(defaultExecutor(), fn); 1756 } 1757 1758 CompletableFuture!(U) thenComposeAsync(U)( 1759 FunctionT!(CompletionStage!(U)) fn, 1760 Executor executor) { 1761 return uniComposeStage!(U)(screenExecutor(executor), fn); 1762 } 1763 1764 CompletableFuture!(T) whenComplete(BiConsumerT action) { 1765 return uniWhenCompleteStage(null, action); 1766 } 1767 1768 CompletableFuture!(T) whenCompleteAsync(BiConsumerT action) { 1769 return uniWhenCompleteStage(defaultExecutor(), action); 1770 } 1771 1772 CompletableFuture!(T) whenCompleteAsync( 1773 BiConsumerT action, Executor executor) { 1774 return uniWhenCompleteStage(screenExecutor(executor), action); 1775 } 1776 1777 CompletableFuture!(U) handle(U)(BiFunction!(T, Throwable, U) fn) { 1778 return uniHandleStage!(U)(null, fn); 1779 } 1780 1781 CompletableFuture!(U) handleAsync(U)( 1782 BiFunction!(T, Throwable, U) fn) { 1783 return uniHandleStage!(U)(defaultExecutor(), fn); 1784 } 1785 1786 CompletableFuture!(U) handleAsync(U)( 1787 BiFunction!(T, Throwable, U) fn, Executor executor) { 1788 return uniHandleStage!(U)(screenExecutor(executor), fn); 1789 } 1790 1791 /** 1792 * Returns this CompletableFuture. 1793 * 1794 * @return this CompletableFuture 1795 */ 1796 CompletableFuture!(T) toCompletableFuture() { 1797 return this; 1798 } 1799 1800 // not in interface CompletionStage 1801 1802 /** 1803 * Returns a new CompletableFuture that is completed when this 1804 * CompletableFuture completes, with the result of the given 1805 * function of the exception triggering this CompletableFuture's 1806 * completion when it completes exceptionally; otherwise, if this 1807 * CompletableFuture completes normally, then the returned 1808 * CompletableFuture also completes normally with the same value. 1809 * Note: More flexible versions of this functionality are 1810 * available using methods {@code whenComplete} and {@code handle}. 1811 * 1812 * @param fn the function to use to compute the value of the 1813 * returned CompletableFuture if this CompletableFuture completed 1814 * exceptionally 1815 * @return the new CompletableFuture 1816 */ 1817 CompletableFuture!(T) exceptionally(Function!(Throwable, T) fn) { 1818 return uniExceptionallyStage(fn); 1819 } 1820 1821 1822 /* ------------- Control and status methods -------------- */ 1823 1824 /** 1825 * If not already completed, completes this CompletableFuture with 1826 * a {@link CancellationException}. Dependent CompletableFutures 1827 * that have not already completed will also complete 1828 * exceptionally, with a {@link CompletionException} caused by 1829 * this {@code CancellationException}. 1830 * 1831 * @param mayInterruptIfRunning this value has no effect in this 1832 * implementation because interrupts are not used to control 1833 * processing. 1834 * 1835 * @return {@code true} if this task is now cancelled 1836 */ 1837 bool cancel(bool mayInterruptIfRunning) { 1838 bool cancelled = (!_isDone) && 1839 completeValue(new AltResult(new CancellationException())); 1840 postComplete(); 1841 return cancelled || isCancelled(); 1842 } 1843 1844 /** 1845 * Forcibly causes subsequent invocations of method {@link #get()} 1846 * and related methods to throw the given exception, whether or 1847 * not already completed. This method is designed for use only in 1848 * error recovery actions, and even in such situations may result 1849 * in ongoing dependent completions using established versus 1850 * overwritten outcomes. 1851 * 1852 * @param ex the exception 1853 * @throws NullPointerException if the exception is null 1854 */ 1855 void obtrudeException(Throwable ex) { 1856 if (ex is null) throw new NullPointerException(); 1857 altResult = new AltResult(ex); 1858 _isDone = true; 1859 postComplete(); 1860 } 1861 1862 /** 1863 * Returns the estimated number of CompletableFutures whose 1864 * completions are awaiting completion of this CompletableFuture. 1865 * This method is designed for use in monitoring system state, not 1866 * for synchronization control. 1867 * 1868 * @return the number of dependent CompletableFutures 1869 */ 1870 int getNumberOfDependents() { 1871 int count = 0; 1872 for (Completion p = stack; p !is null; p = p.next) 1873 ++count; 1874 return count; 1875 } 1876 1877 /** 1878 * Returns a string identifying this CompletableFuture, as well as 1879 * its completion state. The state, in brackets, contains the 1880 * String {@code "Completed Normally"} or the String {@code 1881 * "Completed Exceptionally"}, or the String {@code "Not 1882 * completed"} followed by the number of CompletableFutures 1883 * dependent upon its completion, if any. 1884 * 1885 * @return a string identifying this CompletableFuture, as well as its state 1886 */ 1887 override string toString() { 1888 1889 int count = 0; // avoid call to getNumberOfDependents in case disabled 1890 for (Completion p = stack; p !is null; p = p.next) 1891 ++count; 1892 string s; 1893 if(!isDone) { 1894 s = (count == 0) 1895 ? "[Not completed]" 1896 : "[Not completed, " ~ count.to!string ~ " dependents]"; 1897 } else { 1898 s = "[Completed normally]"; 1899 AltResult ar = cast(AltResult)altResult; 1900 if(ar !is null) { 1901 if(ar.ex !is null) { 1902 s = "[Completed exceptionally: " ~ ar.ex.msg ~ "]"; 1903 } 1904 } 1905 } 1906 return super.toString() ~ s; 1907 } 1908 1909 // jdk9 additions 1910 1911 /** 1912 * Returns the default Executor used for async methods that do not 1913 * specify an Executor. This class uses the {@link 1914 * ForkJoinPool#commonPool()} if it supports more than one 1915 * parallel thread, or else an Executor using one thread per async 1916 * task. This method may be overridden in subclasses to return 1917 * an Executor that provides at least one independent thread. 1918 * 1919 * @return the executor 1920 */ 1921 Executor defaultExecutor() { 1922 return ASYNC_POOL; 1923 } 1924 1925 /** 1926 * Returns a new CompletableFuture that is completed normally with 1927 * the same value as this CompletableFuture when it completes 1928 * normally. If this CompletableFuture completes exceptionally, 1929 * then the returned CompletableFuture completes exceptionally 1930 * with a CompletionException with this exception as cause. The 1931 * behavior is equivalent to {@code thenApply(x -> x)}. This 1932 * method may be useful as a form of "defensive copying", to 1933 * prevent clients from completing, while still being able to 1934 * arrange dependent actions. 1935 * 1936 * @return the new CompletableFuture 1937 */ 1938 CompletableFuture!(T) copy() { 1939 return uniCopyStage!(T, T)(this); 1940 } 1941 1942 /** 1943 * Returns a new CompletionStage that is completed normally with 1944 * the same value as this CompletableFuture when it completes 1945 * normally, and cannot be independently completed or otherwise 1946 * used in ways not defined by the methods of interface {@link 1947 * CompletionStage}. If this CompletableFuture completes 1948 * exceptionally, then the returned CompletionStage completes 1949 * exceptionally with a CompletionException with this exception as 1950 * cause. 1951 * 1952 * <p>Unless overridden by a subclass, a new non-minimal 1953 * CompletableFuture with all methods available can be obtained from 1954 * a minimal CompletionStage via {@link #toCompletableFuture()}. 1955 * For example, completion of a minimal stage can be awaited by 1956 * 1957 * <pre> {@code minimalStage.toCompletableFuture().join(); }</pre> 1958 * 1959 * @return the new CompletionStage 1960 */ 1961 CompletionStage!(T) minimalCompletionStage() { 1962 return uniAsMinimalStage(); 1963 } 1964 1965 /** 1966 * Completes this CompletableFuture with the result of 1967 * the given Supplier function invoked from an asynchronous 1968 * task using the given executor. 1969 * 1970 * @param supplier a function returning the value to be used 1971 * to complete this CompletableFuture 1972 * @param executor the executor to use for asynchronous execution 1973 * @return this CompletableFuture 1974 */ 1975 CompletableFuture!(T) completeAsync(Supplier!(T) supplier, 1976 Executor executor) { 1977 if (supplier is null || executor is null) 1978 throw new NullPointerException(); 1979 executor.execute(new AsyncSupply!(T)(this, supplier)); 1980 return this; 1981 } 1982 1983 /** 1984 * Completes this CompletableFuture with the result of the given 1985 * Supplier function invoked from an asynchronous task using the 1986 * default executor. 1987 * 1988 * @param supplier a function returning the value to be used 1989 * to complete this CompletableFuture 1990 * @return this CompletableFuture 1991 */ 1992 CompletableFuture!(T) completeAsync(Supplier!(T) supplier) { 1993 return completeAsync(supplier, defaultExecutor()); 1994 } 1995 1996 1997 1998 static if(is(T == void)) { 1999 2000 void obtrudeValue() { 2001 _isNull = false; 2002 _isDone = true; 2003 postComplete(); 2004 } 2005 2006 CompletableFuture!(T) orTimeout(Duration timeout) { 2007 if (!_isDone) { 2008 ScheduledFuture!(void) f = Delayer.delay(new Timeout(this), timeout); 2009 whenComplete((Throwable ex) { 2010 if (ex is null && f !is null && !f.isDone()) 2011 f.cancel(false); 2012 }); 2013 } 2014 return this; 2015 } 2016 2017 CompletableFuture!(T) completeOnTimeout(Duration timeout) { 2018 if (!_isDone) { 2019 ScheduledFuture!(void) f = 2020 Delayer.delay(new DelayedCompleter!(T)(this), timeout); 2021 2022 whenComplete((Throwable ex) { 2023 if (ex is null && f !is null && !f.isDone()) 2024 f.cancel(false); 2025 }); 2026 } 2027 return this; 2028 } 2029 2030 } else { 2031 /** 2032 * Forcibly sets or resets the value subsequently returned by 2033 * method {@link #get()} and related methods, whether or not 2034 * already completed. This method is designed for use only in 2035 * error recovery actions, and even in such situations may result 2036 * in ongoing dependent completions using established versus 2037 * overwritten outcomes. 2038 * 2039 * @param value the completion value 2040 */ 2041 void obtrudeValue(T value) { 2042 // result = (value is null) ? NIL : value; 2043 2044 static if(is(T == class) || is(T == interface)) { 2045 if(value is null) { 2046 this.altResult = NIL; 2047 this.result = null; 2048 _isNull = true; 2049 } else { 2050 this.result = value; 2051 _isNull = false; 2052 } 2053 } else { 2054 this.result = value; 2055 _isNull = false; 2056 } 2057 _isDone = true; 2058 2059 postComplete(); 2060 } 2061 2062 /** 2063 * Exceptionally completes this CompletableFuture with 2064 * a {@link TimeoutException} if not otherwise completed 2065 * before the given timeout. 2066 * 2067 * @param timeout how long to wait before completing exceptionally 2068 * with a TimeoutException, in units of {@code unit} 2069 * @param unit a {@code TimeUnit} determining how to interpret the 2070 * {@code timeout} parameter 2071 * @return this CompletableFuture 2072 */ 2073 CompletableFuture!(T) orTimeout(Duration timeout) { 2074 if (!_isDone) { 2075 ScheduledFuture!(void) f = Delayer.delay(new Timeout(this), timeout); 2076 whenComplete((T ignore, Throwable ex) { 2077 if (ex is null && f !is null && !f.isDone()) 2078 f.cancel(false); 2079 }); 2080 } 2081 return this; 2082 } 2083 2084 /** 2085 * Completes this CompletableFuture with the given value if not 2086 * otherwise completed before the given timeout. 2087 * 2088 * @param value the value to use upon timeout 2089 * @param timeout how long to wait before completing normally 2090 * with the given value, in units of {@code unit} 2091 * @param unit a {@code TimeUnit} determining how to interpret the 2092 * {@code timeout} parameter 2093 * @return this CompletableFuture 2094 */ 2095 CompletableFuture!(T) completeOnTimeout(T value, Duration timeout) { 2096 if (!_isDone) { 2097 ScheduledFuture!(void) f = 2098 Delayer.delay(new DelayedCompleter!(T)(this, value), timeout); 2099 2100 whenComplete((T ignore, Throwable ex) { 2101 if (ex is null && f !is null && !f.isDone()) 2102 f.cancel(false); 2103 }); 2104 } 2105 return this; 2106 } 2107 2108 } 2109 } 2110 2111 2112 2113 private final class AltResult { // See above 2114 Throwable ex; // null only for NIL 2115 this(Throwable x) { this.ex = x; } 2116 } 2117 2118 2119 2120 abstract class Completion : ForkJoinTask!(void), 2121 Runnable, AsynchronousCompletionTask { 2122 Completion next; // Treiber stack link 2123 2124 /** 2125 * Performs completion action if triggered, returning a 2126 * dependent that may need propagation, if one exists. 2127 * 2128 * @param mode SYNC, ASYNC, or NESTED 2129 */ 2130 abstract AbstractCompletableFuture tryFire(int mode); 2131 2132 /** Returns true if possibly still triggerable. Used by cleanStack. */ 2133 abstract bool isLive(); 2134 2135 final void run() { tryFire(ASYNC); } 2136 final override bool exec() { tryFire(ASYNC); return false; } 2137 // final override void getRawResult() { return null; } 2138 // final override void setRawResult(void v) {} 2139 } 2140 2141 2142 /** 2143 * A marker interface identifying asynchronous tasks produced by 2144 * {@code async} methods. This may be useful for monitoring, 2145 * debugging, and tracking asynchronous activities. 2146 * 2147 */ 2148 private interface AsynchronousCompletionTask { 2149 } 2150 2151 2152 2153 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ 2154 private final class ThreadPerTaskExecutor : Executor { 2155 void execute(Runnable r) { new ThreadEx(r).start(); } 2156 } 2157 2158 2159 /** A Completion with a source, dependent, and executor. */ 2160 2161 interface IUniCompletion { 2162 bool claim(); 2163 bool isLive(); 2164 } 2165 2166 abstract class UniCompletion : Completion, IUniCompletion { 2167 Executor executor; // executor to use (null if none) 2168 AbstractCompletableFuture dep; // the dependent to complete 2169 AbstractCompletableFuture src; // source for action 2170 2171 this(Executor executor, AbstractCompletableFuture dep, 2172 AbstractCompletableFuture src) { 2173 this.executor = executor; this.dep = dep; this.src = src; 2174 } 2175 2176 /** 2177 * Returns true if action can be run. Call only when known to 2178 * be triggerable. Uses FJ tag bit to ensure that only one 2179 * thread claims ownership. If async, starts as task -- a 2180 * later call to tryFire will run action. 2181 */ 2182 final bool claim() { 2183 Executor e = executor; 2184 if (compareAndSetForkJoinTaskTag(cast(short)0, cast(short)1)) { 2185 if (e is null) 2186 return true; 2187 executor = null; // disable 2188 e.execute(this); 2189 } 2190 return false; 2191 } 2192 2193 final override bool isLive() { return dep !is null; } 2194 } 2195 2196 2197 final class UniApply(T, V) : UniCompletion { 2198 2199 static if(is(T == void)) { 2200 alias FunctionT(V) = Func!(V); 2201 } else { 2202 alias FunctionT(V) = Func1!(T, V); 2203 } 2204 2205 FunctionT!(V) fn; 2206 this(Executor executor, CompletableFuture!(V) dep, 2207 CompletableFuture!(T) src, 2208 FunctionT!(V) fn) { 2209 super(executor, dep, src); this.fn = fn; 2210 } 2211 2212 final override CompletableFuture!(V) tryFire(int mode) { 2213 CompletableFuture!(V) d = cast(CompletableFuture!(V))dep; 2214 CompletableFuture!(T) a = cast(CompletableFuture!(T))src; 2215 Throwable x; FunctionT!(V) f = fn; 2216 if (d is null || f is null || a is null || !a.isDone()) 2217 return null; 2218 2219 tryComplete: if (!d.isDone()) { 2220 AltResult ar = a.altResult; 2221 if (ar !is null) { 2222 if ((x = ar.ex) !is null) { 2223 d.completeThrowable(x, ar); 2224 goto tryComplete; 2225 } 2226 } 2227 try { 2228 if (mode <= 0 && !claim()) { 2229 return null; 2230 } else { 2231 static if(is(T == void)) { 2232 d.completeValue(f()); 2233 } else { 2234 T t = a.result; 2235 d.completeValue(f(t)); 2236 } 2237 } 2238 } catch (Throwable ex) { 2239 d.completeThrowable(ex); 2240 } 2241 } 2242 dep = null; src = null; fn = null; 2243 2244 return d.postFire(a, mode); 2245 } 2246 } 2247 2248 2249 final class UniAccept(T) : UniCompletion { 2250 2251 static if(is(T == void)) { 2252 alias ConsumerT = Action; 2253 } else { 2254 alias ConsumerT = Consumer!(T); 2255 } 2256 ConsumerT fn; 2257 2258 this(Executor executor, CompletableFuture!(void) dep, 2259 CompletableFuture!(T) src, ConsumerT fn) { 2260 super(executor, dep, src); this.fn = fn; 2261 } 2262 2263 final override CompletableFuture!(void) tryFire(int mode) { 2264 CompletableFuture!(void) d = cast(CompletableFuture!(void))dep; 2265 CompletableFuture!(T) a = cast(CompletableFuture!(T))src; 2266 Throwable x; 2267 ConsumerT f = fn; 2268 if (d is null || f is null 2269 || a is null || !a.isDone()) 2270 return null; 2271 2272 tryComplete: if (!d._isDone) { 2273 AltResult ar = a.altResult; 2274 if (ar !is null) { 2275 if ((x = ar.ex) !is null) { 2276 d.completeThrowable(x, ar); 2277 goto tryComplete; 2278 } 2279 } 2280 2281 try { 2282 if (mode <= 0 && !claim()) { 2283 return null; 2284 } else { 2285 static if(is(T == void)) { 2286 f(); 2287 } else { 2288 T t = a.result; 2289 f(t); 2290 } 2291 d.completeNull(); 2292 } 2293 } catch (Throwable ex) { 2294 d.completeThrowable(ex); 2295 } 2296 } 2297 dep = null; src = null; fn = null; 2298 return d.postFire(a, mode); 2299 } 2300 } 2301 2302 2303 final class UniRun(T) : UniCompletion { 2304 Runnable fn; 2305 2306 this(Executor executor, CompletableFuture!(void) dep, 2307 CompletableFuture!(T) src, Runnable fn) { 2308 super(executor, dep, src); this.fn = fn; 2309 } 2310 2311 final override CompletableFuture!(void) tryFire(int mode) { 2312 CompletableFuture!(void) d = cast(CompletableFuture!(void))dep; 2313 CompletableFuture!(T) a = cast(CompletableFuture!(T))src; 2314 Throwable x; 2315 Runnable f = fn; 2316 if (d is null || f is null || a is null || !a.isDone()) 2317 return null; 2318 2319 if (!d.isDone()) { 2320 AltResult ar = a.altResult; 2321 if(ar !is null && (x = ar.ex) !is null) { 2322 d.completeThrowable(x, ar); 2323 } else { 2324 try { 2325 if (mode <= 0 && !claim()) 2326 return null; 2327 else { 2328 f.run(); 2329 d.completeNull(); 2330 } 2331 } catch (Throwable ex) { 2332 d.completeThrowable(ex); 2333 } 2334 } 2335 } 2336 dep = null; src = null; fn = null; 2337 return d.postFire(a, mode); 2338 } 2339 } 2340 2341 2342 final class UniWhenComplete(T) : UniCompletion { 2343 2344 static if(is(T == void)) { 2345 alias BiConsumerT = Action1!(Throwable); 2346 } else { 2347 alias BiConsumerT = BiConsumer!(T, Throwable); 2348 } 2349 2350 BiConsumerT fn; 2351 this(Executor executor, CompletableFuture!(T) dep, 2352 CompletableFuture!(T) src, 2353 BiConsumerT fn) { 2354 super(executor, dep, src); this.fn = fn; 2355 } 2356 2357 final override CompletableFuture!(T) tryFire(int mode) { 2358 CompletableFuture!(T) d = cast(CompletableFuture!(T))dep; 2359 CompletableFuture!(T) a = cast(CompletableFuture!(T))src; 2360 BiConsumerT f = fn; 2361 2362 if (d is null || f is null 2363 || a is null || !a.isDone() 2364 || !d.uniWhenComplete(a, f, mode > 0 ? null : this)) 2365 return null; 2366 dep = null; src = null; fn = null; 2367 return d.postFire(a, mode); 2368 } 2369 } 2370 2371 2372 2373 final class UniHandle(T, V) : UniCompletion { 2374 BiFunction!(T, Throwable, V) fn; 2375 this(Executor executor, CompletableFuture!(V) dep, 2376 CompletableFuture!(T) src, 2377 BiFunction!(T, Throwable, V) fn) { 2378 super(executor, dep, src); this.fn = fn; 2379 } 2380 2381 final override CompletableFuture!(V) tryFire(int mode) { 2382 CompletableFuture!(V) d = cast(CompletableFuture!(V))dep; 2383 CompletableFuture!(T) a = cast(CompletableFuture!(T))src; 2384 BiFunction!(T, Throwable, V) f = fn; 2385 2386 if (d is null || f is null || a is null || !a.isDone() 2387 || !d.uniHandle!(T)(a, f, mode > 0 ? null : this)) 2388 return null; 2389 dep = null; src = null; fn = null; 2390 return d.postFire(a, mode); 2391 } 2392 } 2393 2394 2395 final class UniExceptionally(T) : UniCompletion { 2396 Function!(Throwable, T) fn; 2397 2398 this(CompletableFuture!(T) dep, CompletableFuture!(T) src, 2399 Function!(Throwable, T) fn) { 2400 super(null, dep, src); this.fn = fn; 2401 } 2402 2403 final override CompletableFuture!(T) tryFire(int mode) { // never ASYNC 2404 // assert mode != ASYNC; 2405 CompletableFuture!(T) d = cast(CompletableFuture!(T))dep; 2406 CompletableFuture!(T) a = cast(CompletableFuture!(T))src; 2407 Function!(Throwable, T) f = fn; 2408 2409 if (d is null || f is null || a is null || !a.isDone() 2410 || !d.uniExceptionally(a, f, this)) 2411 return null; 2412 2413 dep = null; src = null; fn = null; 2414 return d.postFire(a, mode); 2415 } 2416 } 2417 2418 2419 final class UniRelay(U, T : U) : UniCompletion { 2420 this(CompletableFuture!(U) dep, CompletableFuture!(T) src) { 2421 super(null, dep, src); 2422 } 2423 2424 final override CompletableFuture!(U) tryFire(int mode) { 2425 CompletableFuture!(U) d; CompletableFuture!(T) a; 2426 if ((d = cast(CompletableFuture!(U))dep) is null 2427 || (a = cast(CompletableFuture!(T))src) is null || !a.isDone()) 2428 return null; 2429 2430 if (!d.isDone()) { 2431 if(a.isFaulted()) { 2432 d.completeValue(a.altResult); 2433 } else { 2434 static if(is(T == void)) { 2435 d.completeValue(); 2436 } else { 2437 d.completeValue(a.result); 2438 } 2439 } 2440 // d.completeRelay(r); 2441 } 2442 src = null; dep = null; 2443 return d.postFire(a, mode); 2444 } 2445 } 2446 2447 2448 final class UniCompose(T, V) : UniCompletion { 2449 2450 static if(is(T == void)) { 2451 alias FunctionT(V) = Func!(V); 2452 } else { 2453 alias FunctionT(V) = Func1!(T, V); 2454 } 2455 2456 FunctionT!(CompletionStage!(V)) fn; 2457 2458 this(Executor executor, CompletableFuture!(V) dep, 2459 CompletableFuture!(T) src, 2460 FunctionT!(CompletionStage!(V)) fn) { 2461 super(executor, dep, src); this.fn = fn; 2462 } 2463 2464 final override CompletableFuture!(V) tryFire(int mode) { 2465 CompletableFuture!(V) d = cast(CompletableFuture!(V))dep; 2466 CompletableFuture!(T) a = cast(CompletableFuture!(T))src; 2467 FunctionT!(CompletionStage!(V)) f = fn; 2468 Throwable x; 2469 2470 if (d is null || f is null 2471 || a is null || !a.isDone()) 2472 return null; 2473 2474 AltResult ar = a.altResult; 2475 T t = a.result; 2476 2477 tryComplete: if (!d.isDone) { 2478 if (ar !is null) { 2479 if ((x = ar.ex) !is null) { 2480 d.completeThrowable(x, ar); 2481 goto tryComplete; 2482 } 2483 warning("To check"); 2484 t = T.init; 2485 } 2486 2487 try { 2488 if (mode <= 0 && !claim()) 2489 return null; 2490 CompletionStage!V ff = f(t); 2491 CompletableFuture!(V) g = cast(CompletableFuture!(V))(ff.toCompletableFuture()); 2492 2493 if (g.isDone()) { 2494 if(g.isFaulted()) 2495 d.completeValue(g.altResult); 2496 else 2497 d.completeValue(g.result); 2498 } 2499 else { 2500 g.unipush(new UniRelay!(V,V)(d, g)); 2501 if (!d.isDone()) 2502 return null; 2503 } 2504 } catch (Throwable ex) { 2505 d.completeThrowable(ex); 2506 } 2507 } 2508 dep = null; src = null; fn = null; 2509 return d.postFire(a, mode); 2510 } 2511 } 2512 2513 2514 /** A Completion for an action with two sources */ 2515 2516 abstract class BiCompletion : UniCompletion { 2517 AbstractCompletableFuture snd; // second source for action 2518 2519 this(Executor executor, AbstractCompletableFuture dep, 2520 AbstractCompletableFuture src, AbstractCompletableFuture snd) { 2521 super(executor, dep, src); this.snd = snd; 2522 } 2523 } 2524 2525 /** A Completion delegating to a BiCompletion */ 2526 2527 final class CoCompletion : Completion { 2528 BiCompletion base; 2529 2530 this(BiCompletion base) { this.base = base; } 2531 2532 final override AbstractCompletableFuture tryFire(int mode) { 2533 BiCompletion c; AbstractCompletableFuture d; 2534 if ((c = base) is null || (d = c.tryFire(mode)) is null) 2535 return null; 2536 base = null; // detach 2537 return d; 2538 } 2539 2540 final override bool isLive() { 2541 BiCompletion c; 2542 return (c = base) !is null 2543 // && c.isLive() 2544 && c.dep !is null; 2545 } 2546 } 2547 2548 2549 final class BiApply(T, U, V) : BiCompletion { 2550 BiFunction!(T, U, V) fn; 2551 2552 this(Executor executor, CompletableFuture!(V) dep, 2553 CompletableFuture!(T) src, CompletableFuture!(U) snd, 2554 BiFunction!(T, U, V) fn) { 2555 super(executor, dep, src, snd); this.fn = fn; 2556 } 2557 2558 final override CompletableFuture!(V) tryFire(int mode) { 2559 CompletableFuture!(V) d = cast(CompletableFuture!(V))dep; 2560 CompletableFuture!(T) a = cast(CompletableFuture!(T))src; 2561 CompletableFuture!(U) b = cast(CompletableFuture!(U))snd; 2562 BiFunction!(T, U, V) f = fn; 2563 2564 if (d is null || f is null || a is null || !a.isDone() 2565 || b is null || !b.isDone() 2566 || !d.biApply!(T, U)(a, b, f, mode > 0 ? null : this)) 2567 return null; 2568 2569 dep = null; src = null; snd = null; fn = null; 2570 return d.postFire(a, b, mode); 2571 } 2572 } 2573 2574 2575 final class BiAccept(T, U) : BiCompletion { 2576 BiConsumer!(T, U) fn; 2577 2578 this(Executor executor, CompletableFuture!(void) dep, 2579 CompletableFuture!T src, CompletableFuture!U snd, 2580 BiConsumer!(T, U) fn) { 2581 super(executor, dep, src, snd); this.fn = fn; 2582 } 2583 2584 final override CompletableFuture!(void) tryFire(int mode) { 2585 CompletableFuture!(void) d = cast(CompletableFuture!(void))dep; 2586 CompletableFuture!T a = cast(CompletableFuture!(T))src; 2587 CompletableFuture!U b = cast(CompletableFuture!(U))snd; 2588 BiConsumer!(T, U) f = fn; 2589 if (d is null || f is null 2590 || a is null || !a.isDone() 2591 || b is null || !b.isDone() 2592 || !d.biAccept!(T, U)(a, b, f, mode > 0 ? null : this)) 2593 return null; 2594 dep = null; src = null; snd = null; fn = null; 2595 return d.postFire(a, b, mode); 2596 } 2597 } 2598 2599 2600 final class BiRun(T, U) : BiCompletion { 2601 Runnable fn; 2602 2603 this(Executor executor, CompletableFuture!(void) dep, 2604 CompletableFuture!(T) src, CompletableFuture!(U) snd, 2605 Runnable fn) { 2606 super(executor, dep, src, snd); this.fn = fn; 2607 } 2608 2609 final override CompletableFuture!(void) tryFire(int mode) { 2610 CompletableFuture!(void) d = cast(CompletableFuture!(void))dep; 2611 CompletableFuture!(T) a = cast(CompletableFuture!(T))src; 2612 CompletableFuture!(U) b = cast(CompletableFuture!(U))snd; 2613 Runnable f = fn; 2614 if (d is null || f is null 2615 || a is null || !a.isDone() 2616 || b is null || !b.isDone() 2617 || !d.biRun(a, b, f, mode > 0 ? null : this)) 2618 return null; 2619 2620 dep = null; src = null; snd = null; fn = null; 2621 return d.postFire(a, b, mode); 2622 } 2623 } 2624 2625 2626 final class BiRelay : BiCompletion { // for And 2627 this(CompletableFuture!(void) dep, 2628 AbstractCompletableFuture src, AbstractCompletableFuture snd) { 2629 super(null, dep, src, snd); 2630 } 2631 2632 final override CompletableFuture!(void) tryFire(int mode) { 2633 CompletableFuture!(void) d = cast(CompletableFuture!(void))dep; 2634 AbstractCompletableFuture a; 2635 AbstractCompletableFuture b; 2636 Throwable x; 2637 2638 if (d is null || (a = src) is null || !a.isDone() 2639 || (b = snd) is null || !b.isDone()) 2640 return null; 2641 2642 if (!d.isDone()) { 2643 AltResult ar = a.altResult; 2644 AltResult ars = b.altResult; 2645 if(ar !is null && (x = ar.ex) !is null){ 2646 d.completeThrowable(x, ar); 2647 } else if(ars !is null && (x = ars.ex) !is null){ 2648 d.completeThrowable(x, ars); 2649 } 2650 else 2651 d.completeNull(); 2652 } 2653 src = null; snd = null; dep = null; 2654 return d.postFire(a, b, mode); 2655 } 2656 } 2657 2658 2659 final class OrApply(T, U : T, V) : BiCompletion { 2660 2661 static if(is(T == void)) { 2662 alias FunctionT(V) = Func!(V); 2663 } else { 2664 alias FunctionT(V) = Func1!(T, V); 2665 } 2666 2667 FunctionT!(V) fn; 2668 2669 this(Executor executor, CompletableFuture!(V) dep, 2670 CompletableFuture!(T) src, CompletableFuture!(U) snd, 2671 FunctionT!(V) fn) { 2672 super(executor, dep, src, snd); this.fn = fn; 2673 } 2674 2675 final override CompletableFuture!(V) tryFire(int mode) { 2676 CompletableFuture!(V) d = cast(CompletableFuture!(V))dep; 2677 CompletableFuture!(T) a = cast(CompletableFuture!(T))src; 2678 CompletableFuture!(U) b = cast(CompletableFuture!(U))snd; 2679 Throwable x; 2680 FunctionT!(V) f = fn; 2681 2682 if (d is null || f is null || a is null || b is null 2683 || (!a.isDone() && !b.isDone())) 2684 return null; 2685 2686 T t; 2687 AltResult ar; 2688 if(a.isDone()) { 2689 t = a.result; 2690 ar = a.altResult; 2691 } else if(b.isDone()) { 2692 t = b.result; 2693 ar = b.altResult; 2694 } else { 2695 warning("unhandled status"); 2696 } 2697 2698 tryComplete: if (!d.isDone()) { 2699 try { 2700 if (mode <= 0 && !claim()) 2701 return null; 2702 2703 if (ar !is null) { 2704 if ((x = ar.ex) !is null) { 2705 d.completeThrowable(x, ar); 2706 goto tryComplete; 2707 } 2708 } 2709 d.completeValue(f(t)); 2710 } catch (Throwable ex) { 2711 d.completeThrowable(ex); 2712 } 2713 } 2714 dep = null; src = null; snd = null; fn = null; 2715 return d.postFire(a, b, mode); 2716 } 2717 } 2718 2719 2720 final class OrAccept(T, U : T) : BiCompletion { 2721 static if(is(T == void)) { 2722 alias ConsumerT = Action; 2723 } else { 2724 alias ConsumerT = Consumer!(T); 2725 } 2726 2727 ConsumerT fn; 2728 this(Executor executor, CompletableFuture!(void) dep, 2729 CompletableFuture!(T) src, CompletableFuture!(U) snd, 2730 ConsumerT fn) { 2731 super(executor, dep, src, snd); this.fn = fn; 2732 } 2733 2734 final override CompletableFuture!(void) tryFire(int mode) { 2735 CompletableFuture!(void) d = cast(CompletableFuture!(void))dep; 2736 CompletableFuture!(T) a = cast(CompletableFuture!(T))src; 2737 CompletableFuture!(U) b = cast(CompletableFuture!(U))snd; 2738 Object r; Throwable x; 2739 ConsumerT f = fn; 2740 2741 if (d is null || f is null || a is null || b is null 2742 || (!a.isDone() && !b.isDone())) 2743 return null; 2744 2745 static if(is(T == void)) { 2746 AltResult ar; 2747 if(a.isDone()) { 2748 ar = a.altResult; 2749 } else if(b.isDone()) { 2750 ar = b.altResult; 2751 } else { 2752 warning("unhandled status"); 2753 } 2754 } else { 2755 T t; 2756 AltResult ar; 2757 if(a.isDone()) { 2758 t = a.result; 2759 ar = a.altResult; 2760 } else if(b.isDone()) { 2761 t = b.result; 2762 ar = b.altResult; 2763 } else { 2764 warning("unhandled status"); 2765 } 2766 } 2767 tryComplete: if (!d.isDone()) { 2768 try { 2769 if (mode <= 0 && !claim()) 2770 return null; 2771 2772 if (ar !is null) { 2773 if ((x = ar.ex) !is null) { 2774 d.completeThrowable(x, ar); 2775 goto tryComplete; 2776 } 2777 r = null; 2778 } 2779 static if(is(T == void)) { 2780 f(); 2781 } else { 2782 f(t); 2783 } 2784 d.completeNull(); 2785 } catch (Throwable ex) { 2786 d.completeThrowable(ex); 2787 } 2788 } 2789 dep = null; src = null; snd = null; fn = null; 2790 return d.postFire(a, b, mode); 2791 } 2792 } 2793 2794 2795 final class OrRun : BiCompletion { 2796 Runnable fn; 2797 2798 this(Executor executor, CompletableFuture!(void) dep, 2799 AbstractCompletableFuture src, AbstractCompletableFuture snd, 2800 Runnable fn) { 2801 super(executor, dep, src, snd); this.fn = fn; 2802 } 2803 2804 final override CompletableFuture!(void) tryFire(int mode) { 2805 CompletableFuture!(void) d = cast(CompletableFuture!(void))dep; 2806 AbstractCompletableFuture a; 2807 AbstractCompletableFuture b; 2808 Throwable x; Runnable f = fn; 2809 if (d is null || f is null 2810 || (a = src) is null || (b = snd) is null 2811 || (!a.isDone() && !b.isDone())) 2812 return null; 2813 2814 if (!d._isDone) { 2815 try { 2816 if (mode <= 0 && !claim()) 2817 return null; 2818 else { 2819 AltResult ar; 2820 if(a.isDone()) 2821 ar = a.altResult; 2822 else if(b.isDone) 2823 ar = b.altResult; 2824 2825 if (ar !is null && (x = ar.ex) !is null) 2826 d.completeThrowable(x, ar); 2827 else { 2828 f.run(); 2829 d.completeNull(); 2830 } 2831 } 2832 } catch (Throwable ex) { 2833 d.completeThrowable(ex); 2834 } 2835 } 2836 dep = null; src = null; snd = null; fn = null; 2837 return d.postFire(a, b, mode); 2838 } 2839 } 2840 2841 2842 /** Completion for an anyOf input future. */ 2843 2844 static class AnyOf(T) : Completion { 2845 CompletableFuture!(T) dep; 2846 CompletableFuture!(T) src; 2847 CompletableFuture!(T)[] srcs; 2848 2849 this(CompletableFuture!(T) dep, CompletableFuture!(T) src, 2850 CompletableFuture!(T)[] srcs) { 2851 this.dep = dep; this.src = src; this.srcs = srcs; 2852 } 2853 2854 final override CompletableFuture!(T) tryFire(int mode) { 2855 // assert mode != ASYNC; 2856 CompletableFuture!(T) d = dep; 2857 CompletableFuture!(T) a = src; 2858 CompletableFuture!(T)[] as = srcs; 2859 if (d is null || a is null || !a.isDone() 2860 || as is null) 2861 return null; 2862 2863 dep = null; src = null; srcs = null; 2864 bool r=false; 2865 if(a.isFaulted()) 2866 r = d.completeValue(a.altResult); 2867 else 2868 r = d.completeValue(a.result); 2869 if (r) { 2870 foreach (AbstractCompletableFuture b; as) 2871 if (b !is a) 2872 b.cleanStack(); 2873 if (mode < 0) 2874 return d; 2875 else 2876 d.postComplete(); 2877 } 2878 return null; 2879 } 2880 2881 final override bool isLive() { 2882 CompletableFuture!(T) d; 2883 return (d = dep) !is null && !d.isDone(); 2884 } 2885 } 2886 2887 2888 /* ------------- Zero-input Async forms -------------- */ 2889 2890 2891 final class AsyncSupply(T) : ForkJoinTask!(void), Runnable, 2892 AsynchronousCompletionTask { 2893 2894 CompletableFuture!(T) dep; 2895 Supplier!(T) fn; 2896 2897 this(CompletableFuture!(T) dep, Supplier!(T) fn) { 2898 this.dep = dep; this.fn = fn; 2899 } 2900 2901 // final override void getRawResult() { return null; } 2902 // final override void setRawResult(void v) {} 2903 final override bool exec() { run(); return false; } 2904 2905 void run() { 2906 CompletableFuture!(T) d; Supplier!(T) f; 2907 if ((d = dep) !is null && (f = fn) !is null) { 2908 dep = null; fn = null; 2909 if (!d._isDone) { 2910 try { 2911 2912 static if(is(T == void)) { 2913 f(); 2914 d.completeValue(); 2915 } else { 2916 d.completeValue(f()); 2917 } 2918 } catch (Throwable ex) { 2919 d.completeThrowable(ex); 2920 } 2921 } 2922 d.postComplete(); 2923 } 2924 } 2925 } 2926 2927 2928 2929 final class AsyncRun : ForkJoinTask!(void), Runnable, AsynchronousCompletionTask { 2930 CompletableFuture!(void) dep; Action fn; 2931 2932 this(CompletableFuture!(void) dep, Action fn) { 2933 this.dep = dep; this.fn = fn; 2934 } 2935 2936 // final override void getRawResult() { return null; } 2937 // final override void setRawResult(void v) {} 2938 final override bool exec() { run(); return false; } 2939 2940 void run() { 2941 CompletableFuture!(void) d; Action f; 2942 if ((d = dep) !is null && (f = fn) !is null) { 2943 dep = null; fn = null; 2944 if (!d.isDone) { 2945 try { 2946 f(); 2947 d.completeNull(); 2948 } catch (Throwable ex) { 2949 warning(ex); 2950 d.completeThrowable(ex); 2951 } 2952 } 2953 d.postComplete(); 2954 } 2955 } 2956 } 2957 2958 2959 /* ------------- Signallers -------------- */ 2960 2961 /** 2962 * Completion for recording and releasing a waiting thread. This 2963 * class implements ManagedBlocker to avoid starvation when 2964 * blocking actions pile up in ForkJoinPools. 2965 */ 2966 2967 final class Signaller : Completion, ManagedBlocker { 2968 Duration remaining; // remaining wait time if timed 2969 MonoTime deadline; // non-zero if timed 2970 bool interruptible; 2971 bool interrupted; 2972 Thread thread; 2973 2974 this(bool interruptible, Duration remaining, MonoTime deadline) { 2975 this.thread = Thread.getThis(); 2976 this.interruptible = interruptible; 2977 this.remaining = remaining; 2978 this.deadline = deadline; 2979 } 2980 2981 final override AbstractCompletableFuture tryFire(int ignore) { 2982 Thread w = thread; // no need to atomically claim 2983 if (w !is null) { 2984 thread = null; 2985 LockSupport.unpark(w); 2986 } 2987 return null; 2988 } 2989 2990 bool isReleasable() { 2991 if (ThreadEx.interrupted()) 2992 interrupted = true; 2993 return ((interrupted && interruptible) || 2994 (deadline != MonoTime.zero && 2995 (remaining <= Duration.zero || 2996 (remaining = deadline - MonoTime.currTime) <= Duration.zero)) || 2997 thread is null); 2998 } 2999 3000 bool block() { 3001 while (!isReleasable()) { 3002 if (deadline == MonoTime.zero) 3003 LockSupport.park(this); 3004 else 3005 LockSupport.park(this, remaining); 3006 } 3007 return true; 3008 } 3009 final override bool isLive() { return thread !is null; } 3010 } 3011 3012 3013 /** 3014 * Singleton delay scheduler, used only for starting and 3015 * cancelling tasks. 3016 */ 3017 3018 3019 final class Delayer { 3020 static ScheduledFuture!(void) delay(Runnable command, Duration delay) { 3021 return delayer.schedule(command, delay); 3022 } 3023 3024 __gshared ScheduledThreadPoolExecutor delayer; 3025 shared static this() { 3026 delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory()); 3027 3028 delayer.setRemoveOnCancelPolicy(true); 3029 } 3030 } 3031 3032 final class DaemonThreadFactory : ThreadFactory { 3033 ThreadEx newThread(Runnable runnable) { 3034 ThreadEx t = new ThreadEx(runnable, "CompletableFutureDelayScheduler"); 3035 t.isDaemon = true; 3036 // t.name = "CompletableFutureDelayScheduler"; 3037 return t; 3038 } 3039 } 3040 3041 // Little class-ified lambdas to better support monitoring 3042 3043 final class DelayedExecutor : Executor { 3044 Duration delay; 3045 Executor executor; 3046 3047 this(Duration delay, Executor executor) { 3048 this.delay = delay; this.executor = executor; 3049 } 3050 void execute(Runnable r) { 3051 Delayer.delay(new TaskSubmitter(executor, r), delay); 3052 } 3053 } 3054 3055 /** Action to submit user task */ 3056 final class TaskSubmitter : Runnable { 3057 Executor executor; 3058 Runnable action; 3059 this(Executor executor, Runnable action) { 3060 this.executor = executor; 3061 this.action = action; 3062 } 3063 void run() { executor.execute(action); } 3064 } 3065 3066 /** Action to completeExceptionally on timeout */ 3067 final class Timeout : Runnable { 3068 AbstractCompletableFuture f; 3069 3070 this(AbstractCompletableFuture f) { this.f = f; } 3071 3072 void run() { 3073 if (f !is null && !f.isDone()) 3074 f.completeExceptionally(new TimeoutException()); 3075 } 3076 } 3077 3078 /** Action to complete on timeout */ 3079 final class DelayedCompleter(U) : Runnable if(!is(U == void)) { 3080 CompletableFuture!(U) f; 3081 U u; 3082 3083 this(CompletableFuture!(U) f, U u) { this.f = f; this.u = u; } 3084 3085 void run() { 3086 if (f !is null) { 3087 f.complete(u); 3088 } 3089 } 3090 } 3091 3092 final class DelayedCompleter(U) : Runnable if(is(U == void)) { 3093 CompletableFuture!(U) f; 3094 3095 this(CompletableFuture!(U) f) { this.f = f; } 3096 3097 void run() { 3098 if (f !is null) { 3099 f.complete(); 3100 } 3101 } 3102 } 3103 3104 3105 /** 3106 * A subclass that just throws UOE for most non-CompletionStage methods. 3107 */ 3108 final class MinimalStage(T) : CompletableFuture!(T) { 3109 this() { } 3110 this(AltResult r) { super(r); } 3111 3112 override CompletableFuture!(U) newIncompleteFuture(U)() { 3113 return new MinimalStage!(U)(); } 3114 override T get() { 3115 throw new UnsupportedOperationException(); } 3116 override T get(Duration timeout) { 3117 throw new UnsupportedOperationException(); } 3118 override T join() { 3119 throw new UnsupportedOperationException(); } 3120 override bool completeExceptionally(Throwable ex) { 3121 throw new UnsupportedOperationException(); } 3122 override bool cancel(bool mayInterruptIfRunning) { 3123 throw new UnsupportedOperationException(); } 3124 override void obtrudeException(Throwable ex) { 3125 throw new UnsupportedOperationException(); } 3126 override bool isDone() { 3127 throw new UnsupportedOperationException(); } 3128 override bool isCancelled() { 3129 throw new UnsupportedOperationException(); } 3130 override bool isCompletedExceptionally() { 3131 throw new UnsupportedOperationException(); } 3132 override int getNumberOfDependents() { 3133 throw new UnsupportedOperationException(); } 3134 override CompletableFuture!(T) completeAsync 3135 (Supplier!(T) supplier, Executor executor) { 3136 throw new UnsupportedOperationException(); } 3137 override CompletableFuture!(T) completeAsync 3138 (Supplier!(T) supplier) { 3139 throw new UnsupportedOperationException(); } 3140 override CompletableFuture!(T) orTimeout 3141 (Duration timeout) { 3142 throw new UnsupportedOperationException(); } 3143 3144 override CompletableFuture!(T) toCompletableFuture() { 3145 if (isDone()) { 3146 if(isFaulted()) { 3147 return new CompletableFuture!(T)(this.altResult); 3148 } else { 3149 static if(is(T == void)) { 3150 return new CompletableFuture!(T)(true); 3151 } else { 3152 return new CompletableFuture!(T)(this.result); 3153 } 3154 } 3155 } else { 3156 CompletableFuture!(T) d = new CompletableFuture!T(); 3157 unipush(new UniRelay!(T,T)(d, this)); 3158 return d; 3159 } 3160 } 3161 3162 static if(is(T == void)) { 3163 3164 this(bool r) { super(r); } 3165 3166 override T getNow() { 3167 throw new UnsupportedOperationException(); } 3168 override bool complete() { 3169 throw new UnsupportedOperationException(); } 3170 3171 override void obtrudeValue() { 3172 throw new UnsupportedOperationException(); } 3173 3174 override CompletableFuture!(T) completeOnTimeout(Duration timeout) { 3175 throw new UnsupportedOperationException(); } 3176 3177 } else { 3178 3179 this(T r) { super(r); } 3180 3181 override T getNow(T valueIfAbsent) { 3182 throw new UnsupportedOperationException(); } 3183 override bool complete(T value) { 3184 throw new UnsupportedOperationException(); } 3185 3186 override void obtrudeValue(T value) { 3187 throw new UnsupportedOperationException(); } 3188 3189 override CompletableFuture!(T) completeOnTimeout(T value, Duration timeout) { 3190 throw new UnsupportedOperationException(); } 3191 } 3192 3193 } 3194 3195 3196 3197 /** 3198 * Null-checks user executor argument, and translates uses of 3199 * commonPool to ASYNC_POOL in case parallelism disabled. 3200 */ 3201 Executor screenExecutor(Executor e) { 3202 if (!USE_COMMON_POOL && e is ForkJoinPool.commonPool()) 3203 return ASYNC_POOL; 3204 if (e is null) throw new NullPointerException(); 3205 return e; 3206 } 3207 3208 3209 /** 3210 * Returns a new CompletableFuture that is asynchronously completed 3211 * by a task running in the {@link ForkJoinPool#commonPool()} with 3212 * the value obtained by calling the given Supplier. 3213 * 3214 * @param supplier a function returning the value to be used 3215 * to complete the returned CompletableFuture 3216 * @param <U> the function's return type 3217 * @return the new CompletableFuture 3218 */ 3219 CompletableFuture!(U) supplyAsync(U)(Supplier!(U) supplier) { 3220 return asyncSupplyStage!(U)(ASYNC_POOL, supplier); 3221 } 3222 3223 /** 3224 * Returns a new CompletableFuture that is asynchronously completed 3225 * by a task running in the given executor with the value obtained 3226 * by calling the given Supplier. 3227 * 3228 * @param supplier a function returning the value to be used 3229 * to complete the returned CompletableFuture 3230 * @param executor the executor to use for asynchronous execution 3231 * @param <U> the function's return type 3232 * @return the new CompletableFuture 3233 */ 3234 CompletableFuture!(U) supplyAsync(U)(Supplier!(U) supplier, Executor executor) { 3235 return asyncSupplyStage!(U)(screenExecutor(executor), supplier); 3236 } 3237 3238 /** 3239 * Returns a new CompletableFuture that is asynchronously completed 3240 * by a task running in the {@link ForkJoinPool#commonPool()} after 3241 * it runs the given action. 3242 * 3243 * @param runnable the action to run before completing the 3244 * returned CompletableFuture 3245 * @return the new CompletableFuture 3246 */ 3247 CompletableFuture!(void) runAsync(Runnable runnable) { 3248 if(runnable is null) 3249 throw new NullPointerException(); 3250 return asyncRunStage(ASYNC_POOL, { runnable.run(); }); 3251 } 3252 3253 3254 CompletableFuture!(void) runAsync(Action act) { 3255 if(act is null) 3256 throw new NullPointerException(); 3257 return asyncRunStage(ASYNC_POOL, act); 3258 } 3259 3260 /** 3261 * Returns a new CompletableFuture that is asynchronously completed 3262 * by a task running in the given executor after it runs the given 3263 * action. 3264 * 3265 * @param runnable the action to run before completing the 3266 * returned CompletableFuture 3267 * @param executor the executor to use for asynchronous execution 3268 * @return the new CompletableFuture 3269 */ 3270 CompletableFuture!(void) runAsync(Runnable runnable, Executor executor) { 3271 if(runnable is null) 3272 throw new NullPointerException(); 3273 return asyncRunStage(screenExecutor(executor), { runnable.run(); }); 3274 } 3275 3276 /** 3277 * Returns a new CompletableFuture that is already completed with 3278 * the given value. 3279 * 3280 * @param value the value 3281 * @param <U> the type of the value 3282 * @return the completed CompletableFuture 3283 */ 3284 CompletableFuture!(U) completedFuture(U)(U value) { 3285 static if(is(U == class) || is(U == interface)) { 3286 if(value is null) 3287 return new CompletableFuture!(U)(NIL); 3288 else 3289 return new CompletableFuture!(U)(value); 3290 } else { 3291 return new CompletableFuture!(U)(value); 3292 } 3293 } 3294 3295 /* ------------- Zero-input Async forms -------------- */ 3296 3297 CompletableFuture!(U) asyncSupplyStage(U)(Executor e, 3298 Supplier!(U) f) { 3299 if (f is null) throw new NullPointerException(); 3300 CompletableFuture!(U) d = new CompletableFuture!(U)(); 3301 e.execute(new AsyncSupply!(U)(d, f)); 3302 return d; 3303 } 3304 3305 3306 CompletableFuture!(void) asyncRunStage(Executor e, Action f) { 3307 if (f is null) throw new NullPointerException(); 3308 CompletableFuture!(void) d = new CompletableFuture!(void)(); 3309 e.execute(new AsyncRun(d, f)); 3310 return d; 3311 } 3312 3313 3314 /** 3315 * Returns a new Executor that submits a task to the given base 3316 * executor after the given delay (or no delay if non-positive). 3317 * Each delay commences upon invocation of the returned executor's 3318 * {@code execute} method. 3319 * 3320 * @param delay how long to delay, in units of {@code unit} 3321 * @param unit a {@code TimeUnit} determining how to interpret the 3322 * {@code delay} parameter 3323 * @param executor the base executor 3324 * @return the new delayed executor 3325 */ 3326 Executor delayedExecutor(Duration delay, Executor executor) { 3327 if (executor is null) 3328 throw new NullPointerException(); 3329 return new DelayedExecutor(delay, executor); 3330 } 3331 3332 /** 3333 * Returns a new Executor that submits a task to the default 3334 * executor after the given delay (or no delay if non-positive). 3335 * Each delay commences upon invocation of the returned executor's 3336 * {@code execute} method. 3337 * 3338 * @param delay how long to delay, in units of {@code unit} 3339 * @param unit a {@code TimeUnit} determining how to interpret the 3340 * {@code delay} parameter 3341 * @return the new delayed executor 3342 */ 3343 Executor delayedExecutor(Duration delay) { 3344 return new DelayedExecutor(delay, ASYNC_POOL); 3345 } 3346 3347 /** 3348 * Returns a new CompletionStage that is already completed with 3349 * the given value and supports only those methods in 3350 * interface {@link CompletionStage}. 3351 * 3352 * @param value the value 3353 * @param <U> the type of the value 3354 * @return the completed CompletionStage 3355 */ 3356 CompletionStage!(U) completedStage(U)(U value) { 3357 return new MinimalStage!(U)((value is null) ? NIL : value); 3358 } 3359 3360 /** 3361 * Returns a new CompletableFuture that is already completed 3362 * exceptionally with the given exception. 3363 * 3364 * @param ex the exception 3365 * @param <U> the type of the value 3366 * @return the exceptionally completed CompletableFuture 3367 */ 3368 CompletableFuture!(U) failedFuture(U)(Throwable ex) { 3369 if (ex is null) throw new NullPointerException(); 3370 return new CompletableFuture!(U)(new AltResult(ex)); 3371 } 3372 3373 /** 3374 * Returns a new CompletionStage that is already completed 3375 * exceptionally with the given exception and supports only those 3376 * methods in interface {@link CompletionStage}. 3377 * 3378 * @param ex the exception 3379 * @param <U> the type of the value 3380 * @return the exceptionally completed CompletionStage 3381 */ 3382 CompletionStage!(U) failedStage(U)(Throwable ex) { 3383 if (ex is null) throw new NullPointerException(); 3384 return new MinimalStage!(U)(new AltResult(ex)); 3385 } 3386 3387 3388 /* ------------- Arbitrary-arity constructions -------------- */ 3389 3390 /** 3391 * Returns a new CompletableFuture that is completed when all of 3392 * the given CompletableFutures complete. If any of the given 3393 * CompletableFutures complete exceptionally, then the returned 3394 * CompletableFuture also does so, with a CompletionException 3395 * holding this exception as its cause. Otherwise, the results, 3396 * if any, of the given CompletableFutures are not reflected in 3397 * the returned CompletableFuture, but may be obtained by 3398 * inspecting them individually. If no CompletableFutures are 3399 * provided, returns a CompletableFuture completed with the value 3400 * {@code null}. 3401 * 3402 * <p>Among the applications of this method is to await completion 3403 * of a set of independent CompletableFutures before continuing a 3404 * program, as in: {@code CompletableFuture.allOf(c1, c2, 3405 * c3).join();}. 3406 * 3407 * @param cfs the CompletableFutures 3408 * @return a new CompletableFuture that is completed when all of the 3409 * given CompletableFutures complete 3410 * @throws NullPointerException if the array or any of its elements are 3411 * {@code null} 3412 */ 3413 CompletableFuture!(void) allOf(T)(CompletableFuture!T[] cfs...) { 3414 return andTree!(T)(cfs, 0, cast(int)cfs.length - 1); 3415 } 3416 3417 /** 3418 * Returns a new CompletableFuture that is completed when any of 3419 * the given CompletableFutures complete, with the same result. 3420 * Otherwise, if it completed exceptionally, the returned 3421 * CompletableFuture also does so, with a CompletionException 3422 * holding this exception as its cause. If no CompletableFutures 3423 * are provided, returns an incomplete CompletableFuture. 3424 * 3425 * @param cfs the CompletableFutures 3426 * @return a new CompletableFuture that is completed with the 3427 * result or exception of any of the given CompletableFutures when 3428 * one completes 3429 * @throws NullPointerException if the array or any of its elements are 3430 * {@code null} 3431 */ 3432 static CompletableFuture!(U) anyOf(U)(CompletableFuture!(U)[] cfs...) { 3433 int n; 3434 if ((n = cast(int)cfs.length) <= 1) 3435 return (n == 0) 3436 ? new CompletableFuture!(U)() 3437 : uniCopyStage!(U, U)(cfs[0]); 3438 3439 foreach(CompletableFuture!(U) cf; cfs) { 3440 if (cf.isDone()) { 3441 if(cf.isFaulted()) 3442 return new CompletableFuture!(U)(encodeRelay(cf.altResult)); 3443 else 3444 return new CompletableFuture!(U)(cf.result); 3445 } 3446 } 3447 3448 cfs = cfs.dup; 3449 CompletableFuture!(U) d = new CompletableFuture!U(); 3450 foreach (CompletableFuture!(U) cf; cfs) 3451 cf.unipush(new AnyOf!(U)(d, cf, cfs)); 3452 // If d was completed while we were adding completions, we should 3453 // clean the stack of any sources that may have had completions 3454 // pushed on their stack after d was completed. 3455 if (d._isDone) 3456 for (size_t i = 0, len = cfs.length; i < len; i++) 3457 if (cfs[i]._isDone) 3458 for (i++; i < len; i++) 3459 if (!cfs[i]._isDone) 3460 cfs[i].cleanStack(); 3461 return d; 3462 } 3463 3464 3465 /** Recursively constructs a tree of completions. */ 3466 private CompletableFuture!(void) andTree(T)(CompletableFuture!T[] cfs, 3467 int lo, int hi) { 3468 CompletableFuture!(void) d = new CompletableFuture!(void)(); 3469 if (lo > hi) // empty 3470 d.completeNull!false(); 3471 else { 3472 AbstractCompletableFuture a, b; 3473 Object r, s, z; 3474 Throwable x; 3475 3476 int mid = (lo + hi) >>> 1; 3477 if ((a = (lo == mid ? cfs[lo] : 3478 andTree!(T)(cfs, lo, mid))) is null || 3479 (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : 3480 andTree!(T)(cfs, mid+1, hi))) is null) 3481 throw new NullPointerException(); 3482 3483 if (!a.isDone() || !b.isDone()) 3484 a.bipush(b, new BiRelay(d, a, b)); 3485 else { 3486 AltResult ar = a.altResult; 3487 AltResult ars = b.altResult; 3488 if(ar !is null && (x = ar.ex) !is null){ 3489 d.completeThrowable!false(x, ar); 3490 } else if(ars !is null && (x = ars.ex) !is null){ 3491 d.completeThrowable!false(x, ars); 3492 } else { 3493 d.completeNull!false(); 3494 } 3495 } 3496 } 3497 return d; 3498 } 3499 3500 /** 3501 * Returns the encoding of the given (non-null) exception as a 3502 * wrapped CompletionException unless it is one already. 3503 */ 3504 private AltResult encodeThrowable(Throwable x) { 3505 CompletionException ex = cast(CompletionException)x; 3506 if(ex is null) { 3507 return new AltResult(new CompletionException(x)); 3508 } else { 3509 return new AltResult(x); 3510 } 3511 } 3512 3513 /** 3514 * Returns the encoding of the given (non-null) exception as a 3515 * wrapped CompletionException unless it is one already. May 3516 * return the given Object r (which must have been the result of a 3517 * source future) if it is equivalent, i.e. if this is a simple 3518 * relay of an existing CompletionException. 3519 */ 3520 private AltResult encodeThrowable(Throwable x, AltResult r) { 3521 CompletionException cex = cast(CompletionException)x; 3522 if (cex is null) 3523 x = new CompletionException(x); 3524 else { 3525 if (r !is null && x is r.ex) 3526 return r; 3527 } 3528 return new AltResult(x); 3529 } 3530 3531 private CompletableFuture!(U) uniCopyStage(U, T : U)(CompletableFuture!(T) src) { 3532 CompletableFuture!(U) d = newIncompleteFuture!(U)();// src.newIncompleteFuture(); 3533 if (src._isDone) { 3534 if(src.isFaulted()) 3535 d.completeValue!false(src.altResult); 3536 else { 3537 static if(is(T == void)) { 3538 d.completeValue!false(); 3539 } else { 3540 d.completeValue!false(src.result); 3541 } 3542 } 3543 } 3544 else 3545 src.unipush(new UniRelay!(U, T)(d, src)); 3546 return d; 3547 } 3548 3549 /** 3550 * Returns the encoding of a copied outcome; if exceptional, 3551 * rewraps as a CompletionException, else returns argument. 3552 */ 3553 private AltResult encodeRelay(AltResult ar) { 3554 Throwable x; 3555 3556 if (ar !is null && (x = ar.ex) !is null) { 3557 CompletionException cex = cast(CompletionException)x; 3558 if(cex is null) { 3559 ar = new AltResult(new CompletionException(x)); 3560 } 3561 } 3562 return ar; 3563 } 3564 3565 /** 3566 * Reports result using Future.get conventions. 3567 */ 3568 private V reportGet(V)(V r, AltResult ar) if(!is(V == void)) { 3569 if (ar is null) // by convention below, null means interrupted 3570 throw new InterruptedException(); 3571 3572 if (ar !is null) { 3573 Throwable x, cause; 3574 if ((x = ar.ex) is null){ 3575 warning("to check"); 3576 return V.init; 3577 } 3578 CancellationException cex = cast(CancellationException)x; 3579 if (cex !is null) 3580 throw cex; 3581 CompletionException cex2 = cast(CompletionException)x; 3582 if (cex2 !is null && 3583 (cause = x.next) !is null) 3584 x = cause; 3585 throw new ExecutionException(x); 3586 } 3587 return r; 3588 } 3589 3590 private void reportGet(AltResult ar) { 3591 if (ar is null) // by convention below, null means interrupted 3592 throw new InterruptedException(); 3593 3594 if (ar !is null) { 3595 Throwable x, cause; 3596 if ((x = ar.ex) is null){ 3597 warning("to check"); 3598 return; 3599 } 3600 CancellationException cex = cast(CancellationException)x; 3601 if (cex !is null) 3602 throw cex; 3603 CompletionException cex2 = cast(CompletionException)x; 3604 if (cex2 !is null && 3605 (cause = x.next) !is null) 3606 x = cause; 3607 throw new ExecutionException(x); 3608 } 3609 } 3610 3611 3612 /** 3613 * Decodes outcome to return result or throw unchecked exception. 3614 */ 3615 private V reportJoin(V)(V r, AltResult ar) if(!is(V == void)) { 3616 if (ar !is null) { 3617 Throwable x; 3618 if ((x = ar.ex) is null) { 3619 warning("to check"); 3620 return V.init; 3621 } 3622 CancellationException cex = cast(CancellationException)x; 3623 if (cex !is null) 3624 throw cex; 3625 CompletionException cex2 = cast(CompletionException)x; 3626 if (cex2 !is null) 3627 throw cex2; 3628 throw new CompletionException(x); 3629 } 3630 return r; 3631 } 3632 3633 3634 private void reportJoin(AltResult ar) { 3635 if (ar !is null) { 3636 Throwable x; 3637 if ((x = ar.ex) is null) { 3638 warning("need to check"); 3639 return; 3640 } 3641 CancellationException cex = cast(CancellationException)x; 3642 if (cex !is null) 3643 throw cex; 3644 CompletionException cex2 = cast(CompletionException)x; 3645 if (cex2 !is null) 3646 throw cex2; 3647 throw new CompletionException(x); 3648 } 3649 } 3650 3651 /** 3652 * Returns a new incomplete CompletableFuture of the type to be 3653 * returned by a CompletionStage method. Subclasses should 3654 * normally override this method to return an instance of the same 3655 * class as this CompletableFuture. The default implementation 3656 * returns an instance of class CompletableFuture. 3657 * 3658 * @param <U> the type of the value 3659 * @return a new CompletableFuture 3660 */ 3661 static CompletableFuture!(U) newIncompleteFuture(U)() { 3662 return new CompletableFuture!(U)(); 3663 }