1 module hunt.concurrency.ForkJoinTaskHelper; 2 3 import hunt.Exceptions; 4 import hunt.util.WeakReference; 5 6 import core.sync.condition; 7 import core.sync.mutex; 8 import core.thread; 9 10 import std.exception; 11 12 alias ReentrantLock = Mutex; 13 14 15 interface IForkJoinTask { 16 /** 17 * The status field holds run control status bits packed into a 18 * single int to ensure atomicity. Status is initially zero, and 19 * takes on nonnegative values until completed, upon which it 20 * holds (sign bit) DONE, possibly with ABNORMAL (cancelled or 21 * exceptional) and THROWN (in which case an exception has been 22 * stored). Tasks with dependent blocked waiting joiners have the 23 * SIGNAL bit set. Completion of a task with SIGNAL set awakens 24 * any waiters via notifyAll. (Waiters also help signal others 25 * upon completion.) 26 * 27 * These control bits occupy only (some of) the upper half (16 28 * bits) of status field. The lower bits are used for user-defined 29 * tags. 30 */ 31 int getStatus(); // accessed directly by pool and workers 32 33 int doExec(); 34 35 void internalWait(long timeout); 36 37 bool cancel(bool mayInterruptIfRunning); 38 39 /** 40 * Cancels, ignoring any exceptions thrown by cancel. Used during 41 * worker and pool shutdown. Cancel is spec'ed not to throw any 42 * exceptions, but if it does anyway, we have no recourse during 43 * shutdown, so guard against this case. 44 */ 45 static void cancelIgnoringExceptions(IForkJoinTask t) { 46 if (t !is null && t.getStatus() >= 0) { 47 try { 48 t.cancel(false); 49 } catch (Throwable ignore) { 50 } 51 } 52 } 53 } 54 55 56 57 /** 58 * Key-value nodes for exception table. The chained hash table 59 * uses identity comparisons, full locking, and weak references 60 * for keys. The table has a fixed capacity because it only 61 * maintains task exceptions long enough for joiners to access 62 * them, so should never become very large for sustained 63 * periods. However, since we do not know when the last joiner 64 * completes, we must use weak references and expunge them. We do 65 * so on each operation (hence full locking). Also, some thread in 66 * any ForkJoinPool will call helpExpungeStaleExceptions when its 67 * pool becomes isQuiescent. 68 */ 69 final class ExceptionNode : WeakReference!IForkJoinTask { 70 Throwable ex; 71 ExceptionNode next; 72 ThreadID thrower; // use id not ref to avoid weak cycles 73 size_t hashCode; // store task hashCode before weak ref disappears 74 this(IForkJoinTask task, Throwable ex, ExceptionNode next) { 75 this.ex = ex; 76 this.next = next; 77 this.thrower = Thread.getThis().id(); 78 this.hashCode = hashOf(task); 79 super(task); 80 } 81 } 82 83 84 /** 85 */ 86 struct ForkJoinTaskHelper { 87 88 // Exception table support 89 90 /** 91 * Hash table of exceptions thrown by tasks, to enable reporting 92 * by callers. Because exceptions are rare, we don't directly keep 93 * them with task objects, but instead use a weak ref table. Note 94 * that cancellation exceptions don't appear in the table, but are 95 * instead recorded as status values. 96 * 97 * The exception table has a fixed capacity. 98 */ 99 package __gshared ExceptionNode[] exceptionTable; 100 101 /** Lock protecting access to exceptionTable. */ 102 package __gshared ReentrantLock exceptionTableLock; 103 104 shared static this() { 105 exceptionTable = new ExceptionNode[32]; 106 exceptionTableLock = new ReentrantLock(); 107 } 108 109 /** 110 * A version of "sneaky throw" to relay exceptions. 111 */ 112 static void rethrow(Throwable ex) { 113 uncheckedThrow!(RuntimeException)(ex); 114 } 115 116 /** 117 * The sneaky part of sneaky throw, relying on generics 118 * limitations to evade compiler complaints about rethrowing 119 * unchecked exceptions. 120 */ 121 static void uncheckedThrow(T)(Throwable t) if(is(T : Throwable)) { 122 if (t !is null) 123 throw cast(T)t; // rely on vacuous cast 124 else 125 throw new Error("Unknown Exception"); 126 } 127 }