1 /*
2 * Hunt - A refined core library for D programming language.
3 *
4 * Copyright (C) 2018-2019 HuntLabs
5 *
6 * Website: https://www.huntlabs.net/
7 *
8 * Licensed under the Apache-2.0 License.
9 *
10 */11 12 modulehunt.concurrency.FutureTask;
13 14 importhunt.concurrency.atomic.AtomicHelper;
15 importhunt.concurrency.Executors;
16 importhunt.concurrency.Future;
17 importhunt.concurrency.thread;
18 19 importhunt.Exceptions;
20 importhunt.util.Common;
21 importhunt.util.CompilerHelper;
22 importhunt.util.Runnable;
23 24 staticif(CompilerHelper.isGreaterThan(2093)) {
25 importcore.thread.osthread;
26 } else {
27 importcore.thread;
28 }
29 30 importcore.time;
31 32 importhunt.concurrency.thread;
33 importhunt.logging;
34 35 36 /**
37 * A cancellable asynchronous computation. This class provides a base
38 * implementation of {@link Future}, with methods to start and cancel
39 * a computation, query to see if the computation is complete, and
40 * retrieve the result of the computation. The result can only be
41 * retrieved when the computation has completed; the {@code get}
42 * methods will block if the computation has not yet completed. Once
43 * the computation has completed, the computation cannot be restarted
44 * or cancelled (unless the computation is invoked using
45 * {@link #runAndReset}).
46 *
47 * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
48 * {@link Runnable} object. Because {@code FutureTask} implements
49 * {@code Runnable}, a {@code FutureTask} can be submitted to an
50 * {@link Executor} for execution.
51 *
52 * <p>In addition to serving as a standalone class, this class provides
53 * {@code protected} functionality that may be useful when creating
54 * customized task classes.
55 *
56 * @author Doug Lea
57 * @param (V) The result type returned by this FutureTask's {@code get} methods
58 */59 classFutureTask(V) : RunnableFuture!(V) {
60 /*
61 * Revision notes: This differs from previous versions of this
62 * class that relied on AbstractQueuedSynchronizer, mainly to
63 * avoid surprising users about retaining interrupt status during
64 * cancellation races. Sync control in the current design relies
65 * on a "state" field updated via CAS to track completion, along
66 * with a simple Treiber stack to hold waiting threads.
67 */68 69 /**
70 * The run state of this task, initially NEW. The run state
71 * transitions to a terminal state only in methods set,
72 * setException, and cancel. During completion, state may take on
73 * values of COMPLETING (while outcome is being set) or
74 * INTERRUPTING (only while interrupting the runner to satisfy a
75 * cancel(true)). Transitions from these intermediate to final
76 * states use cheaper ordered/lazy writes because values are unique
77 * and cannot be further modified.
78 *
79 * Possible state transitions:
80 * NEW -> COMPLETING -> NORMAL
81 * NEW -> COMPLETING -> EXCEPTIONAL
82 * NEW -> CANCELLED
83 * NEW -> INTERRUPTING -> INTERRUPTED
84 */85 privateshared(int) state;
86 privateenumintNEW = 0;
87 privateenumintCOMPLETING = 1;
88 privateenumintNORMAL = 2;
89 privateenumintEXCEPTIONAL = 3;
90 privateenumintCANCELLED = 4;
91 privateenumintINTERRUPTING = 5;
92 privateenumintINTERRUPTED = 6;
93 94 /** The underlying callable; nulled out after running */95 privateCallable!(V) callable;
96 /** The result to return or exception to throw from get() */97 staticif(!is(V == void)) {
98 privateVoutcome; // non-volatile, protected by state reads/writes99 }
100 privateThrowableexception;
101 /** The thread running the callable; CASed during run() */102 privateThreadrunner;
103 /** Treiber stack of waiting threads */104 privateWaitNodewaiters;
105 106 /**
107 * Returns result or throws exception for completed task.
108 *
109 * @param s completed state value
110 */111 112 privateVreport(ints) {
113 // Object x = outcome;114 if (s == NORMAL) {
115 staticif(!is(V == void)) {
116 returnoutcome; // cast(V)117 } else {
118 return ; // cast(V)119 }
120 }
121 122 if (s >= CANCELLED)
123 thrownewCancellationException();
124 thrownewExecutionException(exception);
125 }
126 127 /**
128 * Creates a {@code FutureTask} that will, upon running, execute the
129 * given {@code Callable}.
130 *
131 * @param callable the callable task
132 * @throws NullPointerException if the callable is null
133 */134 this(Callable!(V) callable) {
135 if (callableisnull)
136 thrownewNullPointerException();
137 this.callable = callable;
138 this.state = NEW; // ensure visibility of callable139 }
140 141 /**
142 * Creates a {@code FutureTask} that will, upon running, execute the
143 * given {@code Runnable}, and arrange that {@code get} will return the
144 * given result on successful completion.
145 *
146 * @param runnable the runnable task
147 * @param result the result to return on successful completion. If
148 * you don't need a particular result, consider using
149 * constructions of the form:
150 * {@code Future<?> f = new FutureTask!(void)(runnable, null)}
151 * @throws NullPointerException if the runnable is null
152 */153 staticif(is(V == void)) {
154 this(Runnablerunnable) {
155 this.callable = Executors.callable(runnable);
156 this.state = NEW; // ensure visibility of callable157 }
158 } else {
159 this(Runnablerunnable, Vresult) {
160 this.callable = Executors.callable(runnable, result);
161 this.state = NEW; // ensure visibility of callable162 }
163 }
164 165 boolisCancelled() {
166 returnstate >= CANCELLED;
167 }
168 169 boolisDone() {
170 returnstate != NEW;
171 }
172 173 boolcancel(boolmayInterruptIfRunning) {
174 if (!(state == NEW && AtomicHelper.compareAndSet(state, NEW,
175 mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
176 returnfalse;
177 try { // in case call to interrupt throws exception178 if (mayInterruptIfRunning) {
179 try {
180 ThreadExt = cast(ThreadEx) runner;
181 if (t !isnull)
182 t.interrupt();
183 } finally { // final state184 AtomicHelper.store(state, INTERRUPTED);
185 }
186 }
187 } finally {
188 finishCompletion();
189 }
190 returntrue;
191 }
192 193 /**
194 * @throws CancellationException {@inheritDoc}
195 */196 Vget() {
197 ints = state;
198 if (s <= COMPLETING)
199 s = awaitDone(false, Duration.zero);
200 returnreport(s);
201 }
202 203 /**
204 * @throws CancellationException {@inheritDoc}
205 */206 Vget(Durationtimeout) {
207 ints = state;
208 if (s <= COMPLETING &&
209 (s = awaitDone(true, timeout)) <= COMPLETING)
210 thrownewTimeoutException();
211 returnreport(s);
212 }
213 214 /**
215 * Protected method invoked when this task transitions to state
216 * {@code isDone} (whether normally or via cancellation). The
217 * default implementation does nothing. Subclasses may override
218 * this method to invoke completion callbacks or perform
219 * bookkeeping. Note that you can query status inside the
220 * implementation of this method to determine whether this task
221 * has been cancelled.
222 */223 protectedvoiddone() { }
224 225 /**
226 * Sets the result of this future to the given value unless
227 * this future has already been set or has been cancelled.
228 *
229 * <p>This method is invoked internally by the {@link #run} method
230 * upon successful completion of the computation.
231 *
232 * @param v the value
233 */234 235 staticif(is(V == void)) {
236 protectedvoidset() {
237 if (AtomicHelper.compareAndSet(state, NEW, COMPLETING)) {
238 // outcome = v;239 AtomicHelper.store(state, NORMAL); // final state240 finishCompletion();
241 }
242 }
243 244 voidrun() {
245 if (state != NEW ||
246 !AtomicHelper.compareAndSet(runner, null, Thread.getThis()))
247 return;
248 try {
249 Callable!(V) c = callable;
250 if (c !isnull && state == NEW) {
251 boolran;
252 try {
253 c.call();
254 ran = true;
255 } catch (Throwableex) {
256 ran = false;
257 setException(ex);
258 }
259 if (ran)
260 set();
261 }
262 } finally {
263 // runner must be non-null until state is settled to264 // prevent concurrent calls to run()265 runner = null;
266 // state must be re-read after nulling runner to prevent267 // leaked interrupts268 ints = state;
269 if (s >= INTERRUPTING)
270 handlePossibleCancellationInterrupt(s);
271 }
272 }
273 } else {
274 protectedvoidset(Vv) {
275 if (AtomicHelper.compareAndSet(state, NEW, COMPLETING)) {
276 outcome = v;
277 AtomicHelper.store(state, NORMAL); // final state278 finishCompletion();
279 }
280 }
281 282 voidrun() {
283 if (state != NEW ||
284 !AtomicHelper.compareAndSet(runner, null, Thread.getThis()))
285 return;
286 try {
287 Callable!(V) c = callable;
288 if (c !isnull && state == NEW) {
289 Vresult;
290 boolran;
291 try {
292 result = c.call();
293 ran = true;
294 } catch (Throwableex) {
295 result = V.init;
296 ran = false;
297 setException(ex);
298 }
299 if (ran)
300 set(result);
301 }
302 } finally {
303 // runner must be non-null until state is settled to304 // prevent concurrent calls to run()305 runner = null;
306 // state must be re-read after nulling runner to prevent307 // leaked interrupts308 ints = state;
309 if (s >= INTERRUPTING)
310 handlePossibleCancellationInterrupt(s);
311 }
312 }
313 }
314 315 /**
316 * Causes this future to report an {@link ExecutionException}
317 * with the given throwable as its cause, unless this future has
318 * already been set or has been cancelled.
319 *
320 * <p>This method is invoked internally by the {@link #run} method
321 * upon failure of the computation.
322 *
323 * @param t the cause of failure
324 */325 protectedvoidsetException(Throwablet) {
326 if (AtomicHelper.compareAndSet(state, NEW, COMPLETING)) {
327 exception = t;
328 AtomicHelper.store(state, EXCEPTIONAL); // final state329 finishCompletion();
330 }
331 }
332 333 /**
334 * Executes the computation without setting its result, and then
335 * resets this future to initial state, failing to do so if the
336 * computation encounters an exception or is cancelled. This is
337 * designed for use with tasks that intrinsically execute more
338 * than once.
339 *
340 * @return {@code true} if successfully run and reset
341 */342 protectedboolrunAndReset() {
343 if (state != NEW ||
344 !AtomicHelper.compareAndSet(runner, null, Thread.getThis()))
345 returnfalse;
346 boolran = false;
347 ints = state;
348 try {
349 Callable!(V) c = callable;
350 if (c !isnull && s == NEW) {
351 try {
352 c.call(); // don't set result353 ran = true;
354 } catch (Throwableex) {
355 setException(ex);
356 }
357 }
358 } finally {
359 // runner must be non-null until state is settled to360 // prevent concurrent calls to run()361 runner = null;
362 // state must be re-read after nulling runner to prevent363 // leaked interrupts364 s = state;
365 if (s >= INTERRUPTING)
366 handlePossibleCancellationInterrupt(s);
367 }
368 returnran && s == NEW;
369 }
370 371 /**
372 * Ensures that any interrupt from a possible cancel(true) is only
373 * delivered to a task while in run or runAndReset.
374 */375 privatevoidhandlePossibleCancellationInterrupt(ints) {
376 // It is possible for our interrupter to stall before getting a377 // chance to interrupt us. Let's spin-wait patiently.378 if (s == INTERRUPTING)
379 while (state == INTERRUPTING)
380 Thread.yield(); // wait out pending interrupt381 382 assert(state == INTERRUPTED);
383 384 // We want to clear any interrupt we may have received from385 // cancel(true). However, it is permissible to use interrupts386 // as an independent mechanism for a task to communicate with387 // its caller, and there is no way to clear only the388 // cancellation interrupt.389 //390 ThreadEx.interrupted();
391 }
392 393 /**
394 * Simple linked list nodes to record waiting threads in a Treiber
395 * stack. See other classes such as Phaser and SynchronousQueue
396 * for more detailed explanation.
397 */398 staticfinalclassWaitNode {
399 Threadthread;
400 WaitNodenext;
401 this() { thread = Thread.getThis(); }
402 }
403 404 /**
405 * Removes and signals all waiting threads, invokes done(), and
406 * nulls out callable.
407 */408 privatevoidfinishCompletion() {
409 // assert state > COMPLETING;410 for (WaitNodeq; (q = waiters) !isnull;) {
411 if (AtomicHelper.compareAndSet(waiters, q, null)) {
412 for (;;) {
413 Threadt = q.thread;
414 if (t !isnull) {
415 q.thread = null;
416 LockSupport.unpark(t);
417 }
418 WaitNodenext = q.next;
419 if (nextisnull)
420 break;
421 q.next = null; // unlink to help gc422 q = next;
423 }
424 break;
425 }
426 }
427 428 done();
429 430 callable = null; // to reduce footprint431 }
432 433 /**
434 * Awaits completion or aborts on interrupt or timeout.
435 *
436 * @param timed true if use timed waits
437 * @param duration time to wait, if timed
438 * @return state upon completion or at timeout
439 */440 privateintawaitDone(booltimed, Durationtimeout) {
441 // The code below is very delicate, to achieve these goals:442 // - call nanoTime exactly once for each call to park443 // - if nanos <= 0L, return promptly without allocation or nanoTime444 // - if nanos == Long.MIN_VALUE, don't underflow445 // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic446 // and we suffer a spurious wakeup, we will do no worse than447 // to park-spin for a while448 MonoTimestartTime = MonoTime.zero; // Special value 0L means not yet parked449 WaitNodeq = null;
450 boolqueued = false;
451 for (;;) {
452 ints = state;
453 if (s > COMPLETING) {
454 if (q !isnull)
455 q.thread = null;
456 returns;
457 } elseif (s == COMPLETING) {
458 // We may have already promised (via isDone) that we are done459 // so never return empty-handed or throw InterruptedException460 Thread.yield();
461 } elseif (ThreadEx.interrupted()) {
462 removeWaiter(q);
463 thrownewInterruptedException();
464 } elseif (qisnull) {
465 if (timed && timeout <= Duration.zero)
466 returns;
467 q = newWaitNode();
468 } elseif (!queued) {
469 queued = AtomicHelper.compareAndSet!(WaitNode)(waiters, q.next = waiters, q);
470 } elseif (timed) {
471 DurationparkDuration;
472 if (startTime == MonoTime.zero) { // first time473 startTime = MonoTime.currTime;
474 if (startTime == MonoTime.zero)
475 startTime = MonoTime(1);
476 parkDuration = timeout;
477 } else {
478 Durationelapsed = MonoTime.currTime - startTime;
479 if (elapsed >= timeout) {
480 removeWaiter(q);
481 returnstate;
482 }
483 parkDuration = timeout - elapsed;
484 }
485 // nanoTime may be slow; recheck before parking486 if (state < COMPLETING) {
487 LockSupport.park(this, parkDuration);
488 }
489 } else {
490 LockSupport.park(this);
491 }
492 }
493 }
494 495 /**
496 * Tries to unlink a timed-out or interrupted wait node to avoid
497 * accumulating garbage. Internal nodes are simply unspliced
498 * without CAS since it is harmless if they are traversed anyway
499 * by releasers. To avoid effects of unsplicing from already
500 * removed nodes, the list is retraversed in case of an apparent
501 * race. This is slow when there are a lot of nodes, but we don't
502 * expect lists to be long enough to outweigh higher-overhead
503 * schemes.
504 */505 privatevoidremoveWaiter(WaitNodenode) {
506 if (node !isnull) {
507 node.thread = null;
508 retry:
509 for (;;) { // restart on removeWaiter race510 for (WaitNodepred = null, q = waiters, s; q !isnull; q = s) {
511 s = q.next;
512 if (q.thread !isnull)
513 pred = q;
514 elseif (pred !isnull) {
515 pred.next = s;
516 if (pred.threadisnull) // check for race517 continueretry;
518 }
519 elseif (!AtomicHelper.compareAndSet(waiters, q, s))
520 continueretry;
521 }
522 break;
523 }
524 }
525 }
526 527 /**
528 * Returns a string representation of this FutureTask.
529 *
530 * @implSpec
531 * The default implementation returns a string identifying this
532 * FutureTask, as well as its completion state. The state, in
533 * brackets, contains one of the strings {@code "Completed Normally"},
534 * {@code "Completed Exceptionally"}, {@code "Cancelled"}, or {@code
535 * "Not completed"}.
536 *
537 * @return a string representation of this FutureTask
538 */539 overridestringtoString() {
540 stringstatus;
541 switch (state) {
542 caseNORMAL:
543 status = "[Completed normally]";
544 break;
545 caseEXCEPTIONAL:
546 status = "[Completed exceptionally: " ~ exception.toString() ~ "]";
547 break;
548 caseCANCELLED:
549 caseINTERRUPTING:
550 caseINTERRUPTED:
551 status = "[Cancelled]";
552 break;
553 default:
554 Callable!Vcallable = this.callable;
555 status = (callableisnull)
556 ? "[Not completed]"557 : "[Not completed, task = " ~ (cast(Object)callable).toString() ~ "]";
558 }
559 returnsuper.toString() ~ status;
560 }
561 562 }