1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.concurrency.FuturePromise; 13 14 import hunt.concurrency.Future; 15 import hunt.concurrency.Promise; 16 17 import hunt.Exceptions; 18 import hunt.logging.ConsoleLogger; 19 20 import core.atomic; 21 import core.thread; 22 import core.sync.mutex; 23 import core.sync.condition; 24 25 import std.format; 26 import std.datetime; 27 28 alias ThenHandler(T) = void delegate(T); 29 30 /** 31 * 32 */ 33 class FuturePromise(T) : Future!T, Promise!T { 34 alias VoidHandler = void delegate(); 35 36 private shared bool _isCompleting = false; 37 private bool _isCompleted = false; 38 private Exception _cause; 39 private string _id; 40 private Mutex _waiterLocker; 41 private Condition _waiterCondition; 42 43 this() { 44 _waiterLocker = new Mutex(this); 45 _waiterCondition = new Condition(_waiterLocker); 46 } 47 48 string id() { 49 return _id; 50 } 51 52 void id(string id) { 53 _id = id; 54 } 55 56 ThenHandler!(Exception) _thenFailedHandler; 57 58 static if(is(T == void)) { 59 VoidHandler _thenSucceededHandler; 60 61 FuturePromise!R then(R)(R delegate() handler) { 62 FuturePromise!R result = new FuturePromise!(R); 63 _thenSucceededHandler = () { 64 try { 65 R r = handler(); 66 result.succeeded(r); 67 } catch(Exception ex) { 68 Exception e = new Exception("then exception", ex); 69 result.failed(e); 70 } 71 }; 72 73 _thenFailedHandler = (Exception ex) { 74 Exception e = new Exception("then exception", ex); 75 result.failed(e); 76 }; 77 78 return result; 79 } 80 81 /** 82 * TODO: 83 * 1) keep this operation atomic 84 * 2) return a flag to indicate whether this option is successful. 85 */ 86 void succeeded() { 87 if (cas(&_isCompleting, false, true)) { 88 onCompleted(); 89 } else { 90 warningf("This promise has been done, and can't be set again. cause: %s", 91 typeid(_cause)); 92 } 93 } 94 95 } else { 96 ThenHandler!(T) _thenSucceededHandler; 97 98 FuturePromise!R then(R)(R delegate(T) handler) { 99 FuturePromise!R result = new FuturePromise!(R); 100 _thenSucceededHandler = (T t) { 101 try { 102 static if(is(R == void)) { 103 handler(t); 104 result.succeeded(); 105 } else { 106 R r = handler(t); 107 result.succeeded(r); 108 } 109 } catch(Exception ex) { 110 Exception e = new Exception("then exception", ex); 111 result.failed(e); 112 } 113 }; 114 115 _thenFailedHandler = (Exception ex) { 116 Exception e = new Exception("then exception", ex); 117 result.failed(e); 118 }; 119 120 return result; 121 } 122 123 /** 124 * TODO: 125 * 1) keep this operation atomic 126 * 2) return a flag to indicate whether this option is successful. 127 */ 128 void succeeded(T result) { 129 if (cas(&_isCompleting, false, true)) { 130 _result = result; 131 onCompleted(); 132 } else { 133 warning("This promise has been done, and can't be set again."); 134 } 135 } 136 private T _result; 137 } 138 139 /** 140 * TODO: 141 * 1) keep this operation atomic 142 * 2) return a flag to indicate whether this option is successful. 143 */ 144 void failed(Exception cause) { 145 if (cas(&_isCompleting, false, true)) { 146 _cause = cause; 147 onCompleted(); 148 } else { 149 warningf("This promise has been done, and can't be set again. cause: %s", 150 typeid(_cause)); 151 } 152 } 153 154 bool cancel(bool mayInterruptIfRunning) { 155 if (cas(&_isCompleting, false, true)) { 156 static if(!is(T == void)) { 157 _result = T.init; 158 } 159 _cause = new CancellationException(""); 160 onCompleted(); 161 return true; 162 } 163 return false; 164 } 165 166 private void onCompleted() { 167 _waiterLocker.lock(); 168 _isCompleted = true; 169 scope(exit) { 170 _waiterLocker.unlock(); 171 } 172 173 _waiterCondition.notifyAll(); 174 175 if(_cause is null) { 176 if(_thenSucceededHandler !is null) { 177 static if(is(T == void)) { 178 _thenSucceededHandler(); 179 } else { 180 _thenSucceededHandler(_result); 181 } 182 } 183 } else { 184 if(_thenFailedHandler !is null) { 185 _thenFailedHandler(_cause); 186 } 187 } 188 } 189 190 bool isCancelled() { 191 if (_isCompleted) { 192 try { 193 // _latch.await(); 194 // TODO: Tasks pending completion -@zhangxueping at 2019-12-26T15:18:42+08:00 195 // 196 } catch (InterruptedException e) { 197 throw new RuntimeException(e.msg); 198 } 199 return typeid(_cause) == typeid(CancellationException); 200 } 201 return false; 202 } 203 204 bool isDone() { 205 return _isCompleted; 206 } 207 208 T get() { 209 return get(-1.msecs); 210 } 211 212 T get(Duration timeout) { 213 // waitting for the completion 214 if(!_isCompleted) { 215 _waiterLocker.lock(); 216 scope(exit) { 217 _waiterLocker.unlock(); 218 } 219 220 if(timeout.isNegative()) { 221 version (HUNT_DEBUG) info("Waiting for a promise..."); 222 _waiterCondition.wait(); 223 } else { 224 version (HUNT_DEBUG) { 225 infof("Waiting for a promise in %s...", timeout); 226 } 227 bool r = _waiterCondition.wait(timeout); 228 if(!r) { 229 debug warningf("Timeout for a promise in %s...", timeout); 230 if (cas(&_isCompleting, false, true)) { 231 _isCompleted = true; 232 _cause = new TimeoutException("Timeout in " ~ timeout.toString()); 233 } 234 } 235 } 236 237 if(_cause is null) { 238 version (HUNT_DEBUG) infof("Got a succeeded promise."); 239 } else { 240 version (HUNT_DEBUG) warningf("Got a failed promise: %s", typeid(_cause)); 241 } 242 } 243 244 // succeeded 245 if (_cause is null) { 246 static if(is(T == void)) { 247 return; 248 } else { 249 return _result; 250 } 251 } 252 253 CancellationException c = cast(CancellationException) _cause; 254 if (c !is null) { 255 version(HUNT_DEBUG) info("A promise cancelled."); 256 throw c; 257 } 258 259 debug warning("Get a exception in a promise: ", _cause.msg); 260 version (HUNT_DEBUG) warning(_cause); 261 throw new ExecutionException(_cause); 262 } 263 264 override string toString() { 265 static if(is(T == void)) { 266 return format("FutureCallback@%x{%b, %b, void}", toHash(), _isCompleted, _cause is null); 267 } else { 268 return format("FutureCallback@%x{%b, %b, %s}", toHash(), _isCompleted, _cause is null, _result); 269 } 270 } 271 }