1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.concurrency.FutureTask; 13 14 import hunt.concurrency.atomic.AtomicHelper; 15 import hunt.concurrency.Executors; 16 import hunt.concurrency.Future; 17 import hunt.concurrency.thread; 18 19 import hunt.Exceptions; 20 import hunt.util.Common; 21 import hunt.util.CompilerHelper; 22 import hunt.util.Runnable; 23 24 static if(CompilerHelper.isGreaterThan(2093)) { 25 import core.thread.osthread; 26 } else { 27 import core.thread; 28 } 29 30 import core.time; 31 32 import hunt.concurrency.thread; 33 import hunt.logging.ConsoleLogger; 34 35 36 /** 37 * A cancellable asynchronous computation. This class provides a base 38 * implementation of {@link Future}, with methods to start and cancel 39 * a computation, query to see if the computation is complete, and 40 * retrieve the result of the computation. The result can only be 41 * retrieved when the computation has completed; the {@code get} 42 * methods will block if the computation has not yet completed. Once 43 * the computation has completed, the computation cannot be restarted 44 * or cancelled (unless the computation is invoked using 45 * {@link #runAndReset}). 46 * 47 * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or 48 * {@link Runnable} object. Because {@code FutureTask} implements 49 * {@code Runnable}, a {@code FutureTask} can be submitted to an 50 * {@link Executor} for execution. 51 * 52 * <p>In addition to serving as a standalone class, this class provides 53 * {@code protected} functionality that may be useful when creating 54 * customized task classes. 55 * 56 * @author Doug Lea 57 * @param (V) The result type returned by this FutureTask's {@code get} methods 58 */ 59 class FutureTask(V) : RunnableFuture!(V) { 60 /* 61 * Revision notes: This differs from previous versions of this 62 * class that relied on AbstractQueuedSynchronizer, mainly to 63 * avoid surprising users about retaining interrupt status during 64 * cancellation races. Sync control in the current design relies 65 * on a "state" field updated via CAS to track completion, along 66 * with a simple Treiber stack to hold waiting threads. 67 */ 68 69 /** 70 * The run state of this task, initially NEW. The run state 71 * transitions to a terminal state only in methods set, 72 * setException, and cancel. During completion, state may take on 73 * values of COMPLETING (while outcome is being set) or 74 * INTERRUPTING (only while interrupting the runner to satisfy a 75 * cancel(true)). Transitions from these intermediate to final 76 * states use cheaper ordered/lazy writes because values are unique 77 * and cannot be further modified. 78 * 79 * Possible state transitions: 80 * NEW -> COMPLETING -> NORMAL 81 * NEW -> COMPLETING -> EXCEPTIONAL 82 * NEW -> CANCELLED 83 * NEW -> INTERRUPTING -> INTERRUPTED 84 */ 85 private shared(int) state; 86 private enum int NEW = 0; 87 private enum int COMPLETING = 1; 88 private enum int NORMAL = 2; 89 private enum int EXCEPTIONAL = 3; 90 private enum int CANCELLED = 4; 91 private enum int INTERRUPTING = 5; 92 private enum int INTERRUPTED = 6; 93 94 /** The underlying callable; nulled out after running */ 95 private Callable!(V) callable; 96 /** The result to return or exception to throw from get() */ 97 static if(!is(V == void)) { 98 private V outcome; // non-volatile, protected by state reads/writes 99 } 100 private Throwable exception; 101 /** The thread running the callable; CASed during run() */ 102 private Thread runner; 103 /** Treiber stack of waiting threads */ 104 private WaitNode waiters; 105 106 /** 107 * Returns result or throws exception for completed task. 108 * 109 * @param s completed state value 110 */ 111 112 private V report(int s) { 113 // Object x = outcome; 114 if (s == NORMAL) { 115 static if(!is(V == void)) { 116 return outcome; // cast(V) 117 } else { 118 return ; // cast(V) 119 } 120 } 121 122 if (s >= CANCELLED) 123 throw new CancellationException(); 124 throw new ExecutionException(exception); 125 } 126 127 /** 128 * Creates a {@code FutureTask} that will, upon running, execute the 129 * given {@code Callable}. 130 * 131 * @param callable the callable task 132 * @throws NullPointerException if the callable is null 133 */ 134 this(Callable!(V) callable) { 135 if (callable is null) 136 throw new NullPointerException(); 137 this.callable = callable; 138 this.state = NEW; // ensure visibility of callable 139 } 140 141 /** 142 * Creates a {@code FutureTask} that will, upon running, execute the 143 * given {@code Runnable}, and arrange that {@code get} will return the 144 * given result on successful completion. 145 * 146 * @param runnable the runnable task 147 * @param result the result to return on successful completion. If 148 * you don't need a particular result, consider using 149 * constructions of the form: 150 * {@code Future<?> f = new FutureTask!(void)(runnable, null)} 151 * @throws NullPointerException if the runnable is null 152 */ 153 static if(is(V == void)) { 154 this(Runnable runnable) { 155 this.callable = Executors.callable(runnable); 156 this.state = NEW; // ensure visibility of callable 157 } 158 } else { 159 this(Runnable runnable, V result) { 160 this.callable = Executors.callable(runnable, result); 161 this.state = NEW; // ensure visibility of callable 162 } 163 } 164 165 bool isCancelled() { 166 return state >= CANCELLED; 167 } 168 169 bool isDone() { 170 return state != NEW; 171 } 172 173 bool cancel(bool mayInterruptIfRunning) { 174 if (!(state == NEW && AtomicHelper.compareAndSet(state, NEW, 175 mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) 176 return false; 177 try { // in case call to interrupt throws exception 178 if (mayInterruptIfRunning) { 179 try { 180 ThreadEx t = cast(ThreadEx) runner; 181 if (t !is null) 182 t.interrupt(); 183 } finally { // final state 184 AtomicHelper.store(state, INTERRUPTED); 185 } 186 } 187 } finally { 188 finishCompletion(); 189 } 190 return true; 191 } 192 193 /** 194 * @throws CancellationException {@inheritDoc} 195 */ 196 V get() { 197 int s = state; 198 if (s <= COMPLETING) 199 s = awaitDone(false, Duration.zero); 200 return report(s); 201 } 202 203 /** 204 * @throws CancellationException {@inheritDoc} 205 */ 206 V get(Duration timeout) { 207 int s = state; 208 if (s <= COMPLETING && 209 (s = awaitDone(true, timeout)) <= COMPLETING) 210 throw new TimeoutException(); 211 return report(s); 212 } 213 214 /** 215 * Protected method invoked when this task transitions to state 216 * {@code isDone} (whether normally or via cancellation). The 217 * default implementation does nothing. Subclasses may override 218 * this method to invoke completion callbacks or perform 219 * bookkeeping. Note that you can query status inside the 220 * implementation of this method to determine whether this task 221 * has been cancelled. 222 */ 223 protected void done() { } 224 225 /** 226 * Sets the result of this future to the given value unless 227 * this future has already been set or has been cancelled. 228 * 229 * <p>This method is invoked internally by the {@link #run} method 230 * upon successful completion of the computation. 231 * 232 * @param v the value 233 */ 234 235 static if(is(V == void)) { 236 protected void set() { 237 if (AtomicHelper.compareAndSet(state, NEW, COMPLETING)) { 238 // outcome = v; 239 AtomicHelper.store(state, NORMAL); // final state 240 finishCompletion(); 241 } 242 } 243 244 void run() { 245 if (state != NEW || 246 !AtomicHelper.compareAndSet(runner, null, Thread.getThis())) 247 return; 248 try { 249 Callable!(V) c = callable; 250 if (c !is null && state == NEW) { 251 bool ran; 252 try { 253 c.call(); 254 ran = true; 255 } catch (Throwable ex) { 256 ran = false; 257 setException(ex); 258 } 259 if (ran) 260 set(); 261 } 262 } finally { 263 // runner must be non-null until state is settled to 264 // prevent concurrent calls to run() 265 runner = null; 266 // state must be re-read after nulling runner to prevent 267 // leaked interrupts 268 int s = state; 269 if (s >= INTERRUPTING) 270 handlePossibleCancellationInterrupt(s); 271 } 272 } 273 } else { 274 protected void set(V v) { 275 if (AtomicHelper.compareAndSet(state, NEW, COMPLETING)) { 276 outcome = v; 277 AtomicHelper.store(state, NORMAL); // final state 278 finishCompletion(); 279 } 280 } 281 282 void run() { 283 if (state != NEW || 284 !AtomicHelper.compareAndSet(runner, null, Thread.getThis())) 285 return; 286 try { 287 Callable!(V) c = callable; 288 if (c !is null && state == NEW) { 289 V result; 290 bool ran; 291 try { 292 result = c.call(); 293 ran = true; 294 } catch (Throwable ex) { 295 result = V.init; 296 ran = false; 297 setException(ex); 298 } 299 if (ran) 300 set(result); 301 } 302 } finally { 303 // runner must be non-null until state is settled to 304 // prevent concurrent calls to run() 305 runner = null; 306 // state must be re-read after nulling runner to prevent 307 // leaked interrupts 308 int s = state; 309 if (s >= INTERRUPTING) 310 handlePossibleCancellationInterrupt(s); 311 } 312 } 313 } 314 315 /** 316 * Causes this future to report an {@link ExecutionException} 317 * with the given throwable as its cause, unless this future has 318 * already been set or has been cancelled. 319 * 320 * <p>This method is invoked internally by the {@link #run} method 321 * upon failure of the computation. 322 * 323 * @param t the cause of failure 324 */ 325 protected void setException(Throwable t) { 326 if (AtomicHelper.compareAndSet(state, NEW, COMPLETING)) { 327 exception = t; 328 AtomicHelper.store(state, EXCEPTIONAL); // final state 329 finishCompletion(); 330 } 331 } 332 333 /** 334 * Executes the computation without setting its result, and then 335 * resets this future to initial state, failing to do so if the 336 * computation encounters an exception or is cancelled. This is 337 * designed for use with tasks that intrinsically execute more 338 * than once. 339 * 340 * @return {@code true} if successfully run and reset 341 */ 342 protected bool runAndReset() { 343 if (state != NEW || 344 !AtomicHelper.compareAndSet(runner, null, Thread.getThis())) 345 return false; 346 bool ran = false; 347 int s = state; 348 try { 349 Callable!(V) c = callable; 350 if (c !is null && s == NEW) { 351 try { 352 c.call(); // don't set result 353 ran = true; 354 } catch (Throwable ex) { 355 setException(ex); 356 } 357 } 358 } finally { 359 // runner must be non-null until state is settled to 360 // prevent concurrent calls to run() 361 runner = null; 362 // state must be re-read after nulling runner to prevent 363 // leaked interrupts 364 s = state; 365 if (s >= INTERRUPTING) 366 handlePossibleCancellationInterrupt(s); 367 } 368 return ran && s == NEW; 369 } 370 371 /** 372 * Ensures that any interrupt from a possible cancel(true) is only 373 * delivered to a task while in run or runAndReset. 374 */ 375 private void handlePossibleCancellationInterrupt(int s) { 376 // It is possible for our interrupter to stall before getting a 377 // chance to interrupt us. Let's spin-wait patiently. 378 if (s == INTERRUPTING) 379 while (state == INTERRUPTING) 380 Thread.yield(); // wait out pending interrupt 381 382 assert(state == INTERRUPTED); 383 384 // We want to clear any interrupt we may have received from 385 // cancel(true). However, it is permissible to use interrupts 386 // as an independent mechanism for a task to communicate with 387 // its caller, and there is no way to clear only the 388 // cancellation interrupt. 389 // 390 ThreadEx.interrupted(); 391 } 392 393 /** 394 * Simple linked list nodes to record waiting threads in a Treiber 395 * stack. See other classes such as Phaser and SynchronousQueue 396 * for more detailed explanation. 397 */ 398 static final class WaitNode { 399 Thread thread; 400 WaitNode next; 401 this() { thread = Thread.getThis(); } 402 } 403 404 /** 405 * Removes and signals all waiting threads, invokes done(), and 406 * nulls out callable. 407 */ 408 private void finishCompletion() { 409 // assert state > COMPLETING; 410 for (WaitNode q; (q = waiters) !is null;) { 411 if (AtomicHelper.compareAndSet(waiters, q, null)) { 412 for (;;) { 413 Thread t = q.thread; 414 if (t !is null) { 415 q.thread = null; 416 LockSupport.unpark(t); 417 } 418 WaitNode next = q.next; 419 if (next is null) 420 break; 421 q.next = null; // unlink to help gc 422 q = next; 423 } 424 break; 425 } 426 } 427 428 done(); 429 430 callable = null; // to reduce footprint 431 } 432 433 /** 434 * Awaits completion or aborts on interrupt or timeout. 435 * 436 * @param timed true if use timed waits 437 * @param duration time to wait, if timed 438 * @return state upon completion or at timeout 439 */ 440 private int awaitDone(bool timed, Duration timeout) { 441 // The code below is very delicate, to achieve these goals: 442 // - call nanoTime exactly once for each call to park 443 // - if nanos <= 0L, return promptly without allocation or nanoTime 444 // - if nanos == Long.MIN_VALUE, don't underflow 445 // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic 446 // and we suffer a spurious wakeup, we will do no worse than 447 // to park-spin for a while 448 MonoTime startTime = MonoTime.zero; // Special value 0L means not yet parked 449 WaitNode q = null; 450 bool queued = false; 451 for (;;) { 452 int s = state; 453 if (s > COMPLETING) { 454 if (q !is null) 455 q.thread = null; 456 return s; 457 } else if (s == COMPLETING) { 458 // We may have already promised (via isDone) that we are done 459 // so never return empty-handed or throw InterruptedException 460 Thread.yield(); 461 } else if (ThreadEx.interrupted()) { 462 removeWaiter(q); 463 throw new InterruptedException(); 464 } else if (q is null) { 465 if (timed && timeout <= Duration.zero) 466 return s; 467 q = new WaitNode(); 468 } else if (!queued) { 469 queued = AtomicHelper.compareAndSet!(WaitNode)(waiters, q.next = waiters, q); 470 } else if (timed) { 471 Duration parkDuration; 472 if (startTime == MonoTime.zero) { // first time 473 startTime = MonoTime.currTime; 474 if (startTime == MonoTime.zero) 475 startTime = MonoTime(1); 476 parkDuration = timeout; 477 } else { 478 Duration elapsed = MonoTime.currTime - startTime; 479 if (elapsed >= timeout) { 480 removeWaiter(q); 481 return state; 482 } 483 parkDuration = timeout - elapsed; 484 } 485 // nanoTime may be slow; recheck before parking 486 if (state < COMPLETING) { 487 LockSupport.park(this, parkDuration); 488 } 489 } else { 490 LockSupport.park(this); 491 } 492 } 493 } 494 495 /** 496 * Tries to unlink a timed-out or interrupted wait node to avoid 497 * accumulating garbage. Internal nodes are simply unspliced 498 * without CAS since it is harmless if they are traversed anyway 499 * by releasers. To avoid effects of unsplicing from already 500 * removed nodes, the list is retraversed in case of an apparent 501 * race. This is slow when there are a lot of nodes, but we don't 502 * expect lists to be long enough to outweigh higher-overhead 503 * schemes. 504 */ 505 private void removeWaiter(WaitNode node) { 506 if (node !is null) { 507 node.thread = null; 508 retry: 509 for (;;) { // restart on removeWaiter race 510 for (WaitNode pred = null, q = waiters, s; q !is null; q = s) { 511 s = q.next; 512 if (q.thread !is null) 513 pred = q; 514 else if (pred !is null) { 515 pred.next = s; 516 if (pred.thread is null) // check for race 517 continue retry; 518 } 519 else if (!AtomicHelper.compareAndSet(waiters, q, s)) 520 continue retry; 521 } 522 break; 523 } 524 } 525 } 526 527 /** 528 * Returns a string representation of this FutureTask. 529 * 530 * @implSpec 531 * The default implementation returns a string identifying this 532 * FutureTask, as well as its completion state. The state, in 533 * brackets, contains one of the strings {@code "Completed Normally"}, 534 * {@code "Completed Exceptionally"}, {@code "Cancelled"}, or {@code 535 * "Not completed"}. 536 * 537 * @return a string representation of this FutureTask 538 */ 539 override string toString() { 540 string status; 541 switch (state) { 542 case NORMAL: 543 status = "[Completed normally]"; 544 break; 545 case EXCEPTIONAL: 546 status = "[Completed exceptionally: " ~ exception.toString() ~ "]"; 547 break; 548 case CANCELLED: 549 case INTERRUPTING: 550 case INTERRUPTED: 551 status = "[Cancelled]"; 552 break; 553 default: 554 Callable!V callable = this.callable; 555 status = (callable is null) 556 ? "[Not completed]" 557 : "[Not completed, task = " ~ (cast(Object)callable).toString() ~ "]"; 558 } 559 return super.toString() ~ status; 560 } 561 562 }