1 module hunt.concurrency.ForkJoinTaskHelper;
2 
3 import hunt.Exceptions;
4 import hunt.util.WeakReference;
5 
6 import core.sync.condition;
7 import core.sync.mutex;
8 import core.thread;
9 
10 import std.exception;
11 
12 alias ReentrantLock = Mutex;
13 
14 
15 interface IForkJoinTask {
16     /**
17      * The status field holds run control status bits packed into a
18      * single int to ensure atomicity.  Status is initially zero, and
19      * takes on nonnegative values until completed, upon which it
20      * holds (sign bit) DONE, possibly with ABNORMAL (cancelled or
21      * exceptional) and THROWN (in which case an exception has been
22      * stored). Tasks with dependent blocked waiting joiners have the
23      * SIGNAL bit set.  Completion of a task with SIGNAL set awakens
24      * any waiters via notifyAll. (Waiters also help signal others
25      * upon completion.)
26      *
27      * These control bits occupy only (some of) the upper half (16
28      * bits) of status field. The lower bits are used for user-defined
29      * tags.
30      */
31     int getStatus(); // accessed directly by pool and workers
32 
33     int doExec();
34 
35     void internalWait(long timeout);
36 
37     bool cancel(bool mayInterruptIfRunning);
38 
39     /**
40      * Cancels, ignoring any exceptions thrown by cancel. Used during
41      * worker and pool shutdown. Cancel is spec'ed not to throw any
42      * exceptions, but if it does anyway, we have no recourse during
43      * shutdown, so guard against this case.
44      */
45     static void cancelIgnoringExceptions(IForkJoinTask t) {
46         if (t !is null && t.getStatus() >= 0) {
47             try {
48                 t.cancel(false);
49             } catch (Throwable ignore) {
50             }
51         }
52     }
53 }
54 
55 
56 
57 /**
58  * Key-value nodes for exception table.  The chained hash table
59  * uses identity comparisons, full locking, and weak references
60  * for keys. The table has a fixed capacity because it only
61  * maintains task exceptions long enough for joiners to access
62  * them, so should never become very large for sustained
63  * periods. However, since we do not know when the last joiner
64  * completes, we must use weak references and expunge them. We do
65  * so on each operation (hence full locking). Also, some thread in
66  * any ForkJoinPool will call helpExpungeStaleExceptions when its
67  * pool becomes isQuiescent.
68  */
69 final class ExceptionNode : WeakReference!IForkJoinTask { 
70     Throwable ex;
71     ExceptionNode next;
72     ThreadID thrower;  // use id not ref to avoid weak cycles
73     size_t hashCode;  // store task hashCode before weak ref disappears
74     this(IForkJoinTask task, Throwable ex, ExceptionNode next) {
75         this.ex = ex;
76         this.next = next;
77         this.thrower = Thread.getThis().id();
78         this.hashCode = hashOf(task);
79         super(task);
80     }
81 }
82 
83 
84 /**
85 */
86 struct ForkJoinTaskHelper {
87 
88     // Exception table support
89 
90     /**
91      * Hash table of exceptions thrown by tasks, to enable reporting
92      * by callers. Because exceptions are rare, we don't directly keep
93      * them with task objects, but instead use a weak ref table.  Note
94      * that cancellation exceptions don't appear in the table, but are
95      * instead recorded as status values.
96      *
97      * The exception table has a fixed capacity.
98      */
99     package __gshared ExceptionNode[] exceptionTable;
100 
101     /** Lock protecting access to exceptionTable. */
102     package __gshared ReentrantLock exceptionTableLock;
103 
104     shared static this() {
105         exceptionTable = new ExceptionNode[32];
106         exceptionTableLock = new ReentrantLock();
107     }
108 
109     /**
110      * A version of "sneaky throw" to relay exceptions.
111      */
112     static void rethrow(Throwable ex) {
113         uncheckedThrow!(RuntimeException)(ex);
114     }
115 
116     /**
117      * The sneaky part of sneaky throw, relying on generics
118      * limitations to evade compiler complaints about rethrowing
119      * unchecked exceptions.
120      */
121     static void uncheckedThrow(T)(Throwable t) if(is(T : Throwable)) {
122         if (t !is null)
123             throw cast(T)t; // rely on vacuous cast
124         else
125             throw new Error("Unknown Exception");
126     }
127 }