1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.concurrency.FutureTask;
13 
14 import hunt.concurrency.atomic.AtomicHelper;
15 import hunt.concurrency.Executors;
16 import hunt.concurrency.Future;
17 import hunt.concurrency.thread;
18 
19 import hunt.Exceptions;
20 import hunt.util.Common;
21 import hunt.util.CompilerHelper;
22 import hunt.util.Runnable;
23 
24 static if(CompilerHelper.isGreaterThan(2093)) {
25     import core.thread.osthread;
26 } else {
27     import core.thread;
28 }
29 
30 import core.time;
31 
32 import hunt.concurrency.thread;
33 import hunt.logging;
34 
35 
36 /**
37  * A cancellable asynchronous computation.  This class provides a base
38  * implementation of {@link Future}, with methods to start and cancel
39  * a computation, query to see if the computation is complete, and
40  * retrieve the result of the computation.  The result can only be
41  * retrieved when the computation has completed; the {@code get}
42  * methods will block if the computation has not yet completed.  Once
43  * the computation has completed, the computation cannot be restarted
44  * or cancelled (unless the computation is invoked using
45  * {@link #runAndReset}).
46  *
47  * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
48  * {@link Runnable} object.  Because {@code FutureTask} implements
49  * {@code Runnable}, a {@code FutureTask} can be submitted to an
50  * {@link Executor} for execution.
51  *
52  * <p>In addition to serving as a standalone class, this class provides
53  * {@code protected} functionality that may be useful when creating
54  * customized task classes.
55  *
56  * @author Doug Lea
57  * @param (V) The result type returned by this FutureTask's {@code get} methods
58  */
59 class FutureTask(V) : RunnableFuture!(V) {
60     /*
61      * Revision notes: This differs from previous versions of this
62      * class that relied on AbstractQueuedSynchronizer, mainly to
63      * avoid surprising users about retaining interrupt status during
64      * cancellation races. Sync control in the current design relies
65      * on a "state" field updated via CAS to track completion, along
66      * with a simple Treiber stack to hold waiting threads.
67      */
68 
69     /**
70      * The run state of this task, initially NEW.  The run state
71      * transitions to a terminal state only in methods set,
72      * setException, and cancel.  During completion, state may take on
73      * values of COMPLETING (while outcome is being set) or
74      * INTERRUPTING (only while interrupting the runner to satisfy a
75      * cancel(true)). Transitions from these intermediate to final
76      * states use cheaper ordered/lazy writes because values are unique
77      * and cannot be further modified.
78      *
79      * Possible state transitions:
80      * NEW -> COMPLETING -> NORMAL
81      * NEW -> COMPLETING -> EXCEPTIONAL
82      * NEW -> CANCELLED
83      * NEW -> INTERRUPTING -> INTERRUPTED
84      */
85     private shared(int) state;
86     private enum int NEW          = 0;
87     private enum int COMPLETING   = 1;
88     private enum int NORMAL       = 2;
89     private enum int EXCEPTIONAL  = 3;
90     private enum int CANCELLED    = 4;
91     private enum int INTERRUPTING = 5;
92     private enum int INTERRUPTED  = 6;
93 
94     /** The underlying callable; nulled out after running */
95     private Callable!(V) callable;
96     /** The result to return or exception to throw from get() */
97     static if(!is(V == void)) {
98         private V outcome; // non-volatile, protected by state reads/writes
99     }
100     private Throwable exception;
101     /** The thread running the callable; CASed during run() */
102     private Thread runner;
103     /** Treiber stack of waiting threads */
104     private WaitNode waiters;
105 
106     /**
107      * Returns result or throws exception for completed task.
108      *
109      * @param s completed state value
110      */
111 
112     private V report(int s) {
113         // Object x = outcome;
114         if (s == NORMAL) {
115             static if(!is(V == void)) {
116                 return outcome; // cast(V)
117             } else {
118                 return ; // cast(V)
119             }
120         }
121             
122         if (s >= CANCELLED)
123             throw new CancellationException();
124         throw new ExecutionException(exception);
125     }
126 
127     /**
128      * Creates a {@code FutureTask} that will, upon running, execute the
129      * given {@code Callable}.
130      *
131      * @param  callable the callable task
132      * @throws NullPointerException if the callable is null
133      */
134     this(Callable!(V) callable) {
135         if (callable is null)
136             throw new NullPointerException();
137         this.callable = callable;
138         this.state = NEW;       // ensure visibility of callable
139     }
140 
141     /**
142      * Creates a {@code FutureTask} that will, upon running, execute the
143      * given {@code Runnable}, and arrange that {@code get} will return the
144      * given result on successful completion.
145      *
146      * @param runnable the runnable task
147      * @param result the result to return on successful completion. If
148      * you don't need a particular result, consider using
149      * constructions of the form:
150      * {@code Future<?> f = new FutureTask!(void)(runnable, null)}
151      * @throws NullPointerException if the runnable is null
152      */
153 static if(is(V == void)) {
154     this(Runnable runnable) {
155         this.callable = Executors.callable(runnable);
156         this.state = NEW;       // ensure visibility of callable
157     }
158 } else {
159     this(Runnable runnable, V result) {
160         this.callable = Executors.callable(runnable, result);
161         this.state = NEW;       // ensure visibility of callable
162     }
163 }
164 
165     bool isCancelled() {
166         return state >= CANCELLED;
167     }
168 
169     bool isDone() {
170         return state != NEW;
171     }
172 
173     bool cancel(bool mayInterruptIfRunning) {
174         if (!(state == NEW && AtomicHelper.compareAndSet(state, NEW,
175             mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
176             return false;
177         try {    // in case call to interrupt throws exception
178             if (mayInterruptIfRunning) {
179                 try {
180                     ThreadEx t = cast(ThreadEx) runner;
181                     if (t !is null)
182                         t.interrupt();
183                 } finally { // final state
184                     AtomicHelper.store(state, INTERRUPTED);
185                 }
186             }
187         } finally {
188             finishCompletion();
189         }
190         return true;
191     }
192 
193     /**
194      * @throws CancellationException {@inheritDoc}
195      */
196     V get() {
197         int s = state;
198         if (s <= COMPLETING)
199             s = awaitDone(false, Duration.zero);
200         return report(s);
201     }
202 
203     /**
204      * @throws CancellationException {@inheritDoc}
205      */
206     V get(Duration timeout) {
207         int s = state;
208         if (s <= COMPLETING &&
209             (s = awaitDone(true, timeout)) <= COMPLETING)
210             throw new TimeoutException();
211         return report(s);
212     }
213 
214     /**
215      * Protected method invoked when this task transitions to state
216      * {@code isDone} (whether normally or via cancellation). The
217      * default implementation does nothing.  Subclasses may override
218      * this method to invoke completion callbacks or perform
219      * bookkeeping. Note that you can query status inside the
220      * implementation of this method to determine whether this task
221      * has been cancelled.
222      */
223     protected void done() { }
224 
225     /**
226      * Sets the result of this future to the given value unless
227      * this future has already been set or has been cancelled.
228      *
229      * <p>This method is invoked internally by the {@link #run} method
230      * upon successful completion of the computation.
231      *
232      * @param v the value
233      */
234 
235 static if(is(V == void)) {
236     protected void set() {
237         if (AtomicHelper.compareAndSet(state, NEW, COMPLETING)) {
238             // outcome = v;
239             AtomicHelper.store(state, NORMAL);  // final state
240             finishCompletion();
241         }
242     }
243 
244     void run() {
245         if (state != NEW ||
246             !AtomicHelper.compareAndSet(runner, null, Thread.getThis()))
247             return;
248         try {
249             Callable!(V) c = callable;
250             if (c !is null && state == NEW) {
251                 bool ran;
252                 try {
253                     c.call();
254                     ran = true;
255                 } catch (Throwable ex) {
256                     ran = false;
257                     setException(ex);
258                 }
259                 if (ran)
260                     set();
261             }
262         } finally {
263             // runner must be non-null until state is settled to
264             // prevent concurrent calls to run()
265             runner = null;
266             // state must be re-read after nulling runner to prevent
267             // leaked interrupts
268             int s = state;
269             if (s >= INTERRUPTING)
270                 handlePossibleCancellationInterrupt(s);
271         }
272     }    
273 } else {
274     protected void set(V v) {
275         if (AtomicHelper.compareAndSet(state, NEW, COMPLETING)) {
276             outcome = v;
277             AtomicHelper.store(state, NORMAL);  // final state
278             finishCompletion();
279         }
280     }
281 
282     void run() {
283         if (state != NEW ||
284             !AtomicHelper.compareAndSet(runner, null, Thread.getThis()))
285             return;
286         try {
287             Callable!(V) c = callable;
288             if (c !is null && state == NEW) {
289                 V result;
290                 bool ran;
291                 try {
292                     result = c.call();
293                     ran = true;
294                 } catch (Throwable ex) {
295                     result = V.init;
296                     ran = false;
297                     setException(ex);
298                 }
299                 if (ran)
300                     set(result);
301             }
302         } finally {
303             // runner must be non-null until state is settled to
304             // prevent concurrent calls to run()
305             runner = null;
306             // state must be re-read after nulling runner to prevent
307             // leaked interrupts
308             int s = state;
309             if (s >= INTERRUPTING)
310                 handlePossibleCancellationInterrupt(s);
311         }
312     }    
313 }
314 
315     /**
316      * Causes this future to report an {@link ExecutionException}
317      * with the given throwable as its cause, unless this future has
318      * already been set or has been cancelled.
319      *
320      * <p>This method is invoked internally by the {@link #run} method
321      * upon failure of the computation.
322      *
323      * @param t the cause of failure
324      */
325     protected void setException(Throwable t) {
326         if (AtomicHelper.compareAndSet(state, NEW, COMPLETING)) {
327             exception = t;
328             AtomicHelper.store(state, EXCEPTIONAL); // final state
329             finishCompletion();
330         }
331     }
332 
333     /**
334      * Executes the computation without setting its result, and then
335      * resets this future to initial state, failing to do so if the
336      * computation encounters an exception or is cancelled.  This is
337      * designed for use with tasks that intrinsically execute more
338      * than once.
339      *
340      * @return {@code true} if successfully run and reset
341      */
342     protected bool runAndReset() {
343         if (state != NEW ||
344             !AtomicHelper.compareAndSet(runner, null, Thread.getThis()))
345             return false;
346         bool ran = false;
347         int s = state;
348         try {
349             Callable!(V) c = callable;
350             if (c !is null && s == NEW) {
351                 try {
352                     c.call(); // don't set result
353                     ran = true;
354                 } catch (Throwable ex) {
355                     setException(ex);
356                 }
357             }
358         } finally {
359             // runner must be non-null until state is settled to
360             // prevent concurrent calls to run()
361             runner = null;
362             // state must be re-read after nulling runner to prevent
363             // leaked interrupts
364             s = state;
365             if (s >= INTERRUPTING)
366                 handlePossibleCancellationInterrupt(s);
367         }
368         return ran && s == NEW;
369     }
370 
371     /**
372      * Ensures that any interrupt from a possible cancel(true) is only
373      * delivered to a task while in run or runAndReset.
374      */
375     private void handlePossibleCancellationInterrupt(int s) {
376         // It is possible for our interrupter to stall before getting a
377         // chance to interrupt us.  Let's spin-wait patiently.
378         if (s == INTERRUPTING)
379             while (state == INTERRUPTING)
380                 Thread.yield(); // wait out pending interrupt
381 
382         assert(state == INTERRUPTED);
383 
384         // We want to clear any interrupt we may have received from
385         // cancel(true).  However, it is permissible to use interrupts
386         // as an independent mechanism for a task to communicate with
387         // its caller, and there is no way to clear only the
388         // cancellation interrupt.
389         //
390         ThreadEx.interrupted();
391     }
392 
393     /**
394      * Simple linked list nodes to record waiting threads in a Treiber
395      * stack.  See other classes such as Phaser and SynchronousQueue
396      * for more detailed explanation.
397      */
398     static final class WaitNode {
399         Thread thread;
400         WaitNode next;
401         this() { thread = Thread.getThis(); }
402     }
403 
404     /**
405      * Removes and signals all waiting threads, invokes done(), and
406      * nulls out callable.
407      */
408     private void finishCompletion() {
409         // assert state > COMPLETING;
410         for (WaitNode q; (q = waiters) !is null;) {
411             if (AtomicHelper.compareAndSet(waiters, q, null)) {
412                 for (;;) {
413                     Thread t = q.thread;
414                     if (t !is null) {
415                         q.thread = null;
416                         LockSupport.unpark(t);
417                     }
418                     WaitNode next = q.next;
419                     if (next is null)
420                         break;
421                     q.next = null; // unlink to help gc
422                     q = next;
423                 }
424                 break;
425             }
426         }
427 
428         done();
429 
430         callable = null;        // to reduce footprint
431     }
432 
433     /**
434      * Awaits completion or aborts on interrupt or timeout.
435      *
436      * @param timed true if use timed waits
437      * @param duration time to wait, if timed
438      * @return state upon completion or at timeout
439      */
440     private int awaitDone(bool timed, Duration timeout) {
441         // The code below is very delicate, to achieve these goals:
442         // - call nanoTime exactly once for each call to park
443         // - if nanos <= 0L, return promptly without allocation or nanoTime
444         // - if nanos == Long.MIN_VALUE, don't underflow
445         // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
446         //   and we suffer a spurious wakeup, we will do no worse than
447         //   to park-spin for a while
448         MonoTime startTime = MonoTime.zero;    // Special value 0L means not yet parked
449         WaitNode q = null;
450         bool queued = false;
451         for (;;) {
452             int s = state;
453             if (s > COMPLETING) {
454                 if (q !is null)
455                     q.thread = null;
456                 return s;
457             } else if (s == COMPLETING) {
458                 // We may have already promised (via isDone) that we are done
459                 // so never return empty-handed or throw InterruptedException
460                 Thread.yield();
461             } else if (ThreadEx.interrupted()) {
462                 removeWaiter(q);
463                 throw new InterruptedException();
464             } else if (q is null) {
465                 if (timed && timeout <= Duration.zero)
466                     return s;
467                 q = new WaitNode();
468             } else if (!queued) {
469                 queued = AtomicHelper.compareAndSet!(WaitNode)(waiters, q.next = waiters, q);
470             } else if (timed) {
471                 Duration parkDuration;
472                 if (startTime == MonoTime.zero) { // first time
473                     startTime = MonoTime.currTime;
474                     if (startTime == MonoTime.zero)
475                         startTime = MonoTime(1);
476                     parkDuration = timeout;
477                 } else {                    
478                     Duration elapsed = MonoTime.currTime - startTime;
479                     if (elapsed >= timeout) {
480                         removeWaiter(q);
481                         return state;
482                     }
483                     parkDuration = timeout - elapsed;
484                 }
485                 // nanoTime may be slow; recheck before parking
486                 if (state < COMPLETING) {
487                     LockSupport.park(this, parkDuration);
488                 }
489             } else {
490                 LockSupport.park(this);
491             }
492         }
493     }
494 
495     /**
496      * Tries to unlink a timed-out or interrupted wait node to avoid
497      * accumulating garbage.  Internal nodes are simply unspliced
498      * without CAS since it is harmless if they are traversed anyway
499      * by releasers.  To avoid effects of unsplicing from already
500      * removed nodes, the list is retraversed in case of an apparent
501      * race.  This is slow when there are a lot of nodes, but we don't
502      * expect lists to be long enough to outweigh higher-overhead
503      * schemes.
504      */
505     private void removeWaiter(WaitNode node) {
506         if (node !is null) {
507             node.thread = null;
508             retry:
509             for (;;) {          // restart on removeWaiter race
510                 for (WaitNode pred = null, q = waiters, s; q !is null; q = s) {
511                     s = q.next;
512                     if (q.thread !is null)
513                         pred = q;
514                     else if (pred !is null) {
515                         pred.next = s;
516                         if (pred.thread is null) // check for race
517                             continue retry;
518                     }
519                     else if (!AtomicHelper.compareAndSet(waiters, q, s))
520                         continue retry;
521                 }
522                 break;
523             }
524         }
525     }
526 
527     /**
528      * Returns a string representation of this FutureTask.
529      *
530      * @implSpec
531      * The default implementation returns a string identifying this
532      * FutureTask, as well as its completion state.  The state, in
533      * brackets, contains one of the strings {@code "Completed Normally"},
534      * {@code "Completed Exceptionally"}, {@code "Cancelled"}, or {@code
535      * "Not completed"}.
536      *
537      * @return a string representation of this FutureTask
538      */
539     override string toString() {
540         string status;
541         switch (state) {
542         case NORMAL:
543             status = "[Completed normally]";
544             break;
545         case EXCEPTIONAL:
546             status = "[Completed exceptionally: " ~ exception.toString() ~ "]";
547             break;
548         case CANCELLED:
549         case INTERRUPTING:
550         case INTERRUPTED:
551             status = "[Cancelled]";
552             break;
553         default:
554             Callable!V callable = this.callable;
555             status = (callable is null)
556                 ? "[Not completed]"
557                 : "[Not completed, task = " ~ (cast(Object)callable).toString() ~ "]";
558         }
559         return super.toString() ~ status;
560     }
561 
562 }