1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.concurrency.FuturePromise; 13 14 import hunt.concurrency.Future; 15 import hunt.concurrency.Promise; 16 17 import hunt.Exceptions; 18 import hunt.logging; 19 20 import core.atomic; 21 import core.thread; 22 import core.sync.mutex; 23 import core.sync.condition; 24 25 import std.conv; 26 import std.format; 27 import std.datetime; 28 29 alias ThenHandler(T) = void delegate(T); 30 31 /** 32 * 33 */ 34 class FuturePromise(T) : Future!T, Promise!T { 35 alias VoidHandler = void delegate(); 36 37 private shared bool _isCompleting = false; 38 private bool _isCompleted = false; 39 private Throwable _cause; 40 private string _id; 41 private Mutex _waiterLocker; 42 private Condition _waiterCondition; 43 44 this(string id="") { 45 _id = id; 46 _waiterLocker = new Mutex(this); 47 _waiterCondition = new Condition(_waiterLocker); 48 } 49 50 string id() { 51 return _id; 52 } 53 54 // void id(string id) { 55 // _id = id; 56 // } 57 58 ThenHandler!(Throwable) _thenFailedHandler; 59 60 static if(is(T == void)) { 61 VoidHandler _thenSucceededHandler; 62 63 FuturePromise!R then(R)(R delegate() handler) { 64 FuturePromise!R result = new FuturePromise!(R); 65 _thenSucceededHandler = () { 66 try { 67 R r = handler(); 68 result.succeeded(r); 69 } catch(Exception ex) { 70 Exception e = new Exception("then exception", ex); 71 result.failed(e); 72 } 73 }; 74 75 _thenFailedHandler = (Throwable ex) { 76 Exception e = new Exception("then exception", ex); 77 result.failed(e); 78 }; 79 80 return result; 81 } 82 83 /** 84 * TODO: 85 * 1) keep this operation atomic 86 * 2) return a flag to indicate whether this option is successful. 87 */ 88 bool succeeded() { 89 if (cas(&_isCompleting, false, true)) { 90 onCompleted(); 91 return true; 92 } else { 93 warningf("This promise has completed, and can't be set again. cause: %s", 94 _cause is null ? "null" : _cause.msg); 95 return false; 96 } 97 } 98 99 } else { 100 ThenHandler!(T) _thenSucceededHandler; 101 102 FuturePromise!R then(R)(R delegate(T) handler) { 103 FuturePromise!R result = new FuturePromise!(R); 104 _thenSucceededHandler = (T t) { 105 try { 106 static if(is(R == void)) { 107 handler(t); 108 result.succeeded(); 109 } else { 110 R r = handler(t); 111 result.succeeded(r); 112 } 113 } catch(Exception ex) { 114 Exception e = new Exception("then exception", ex); 115 result.failed(e); 116 } 117 }; 118 119 _thenFailedHandler = (Throwable ex) { 120 Exception e = new Exception("then exception", ex); 121 result.failed(e); 122 }; 123 124 return result; 125 } 126 127 /** 128 * 129 */ 130 bool succeeded(T result) { 131 version(HUNT_DEBUG_MORE) tracef("completing promise [%s]", id()); 132 133 if (cas(&_isCompleting, false, true)) { 134 _result = result; 135 onCompleted(); 136 return true; 137 } else { 138 warningf("The promise [%s] has completed, and can't be set again. Result: %s, Cause: %s", id(), 139 _result.to!string(), _cause is null ? "null" : _cause.msg); 140 return false; 141 } 142 } 143 private T _result; 144 } 145 146 /** 147 * 148 */ 149 bool failed(Throwable cause) { 150 if (cas(&_isCompleting, false, true)) { 151 _cause = cause; 152 onCompleted(); 153 return true; 154 } else { 155 warningf("This promise [%s] has been done, and can't be set again. cause: %s", 156 id(), (cause is null) ? "null" : cause.msg); 157 return false; 158 } 159 } 160 161 bool cancel(bool mayInterruptIfRunning) { 162 if (cas(&_isCompleting, false, true)) { 163 static if(!is(T == void)) { 164 _result = T.init; 165 } 166 _cause = new CancellationException(""); 167 onCompleted(); 168 return true; 169 } 170 return false; 171 } 172 173 private void onCompleted() { 174 175 version(HUNT_DEBUG_MORE) tracef("Promise [%s] completed", id()); 176 177 _waiterLocker.lock(); 178 scope(exit) { 179 _waiterLocker.unlock(); 180 } 181 _isCompleted = true; 182 183 _waiterCondition.notifyAll(); 184 185 if(_cause is null) { 186 if(_thenSucceededHandler !is null) { 187 static if(is(T == void)) { 188 _thenSucceededHandler(); 189 } else { 190 _thenSucceededHandler(_result); 191 } 192 } 193 } else { 194 if(_thenFailedHandler !is null) { 195 _thenFailedHandler(_cause); 196 } 197 } 198 } 199 200 bool isCancelled() { 201 if (_isCompleted) { 202 try { 203 // _latch.await(); 204 // TODO: Tasks pending completion -@zhangxueping at 2019-12-26T15:18:42+08:00 205 // 206 } catch (InterruptedException e) { 207 throw new RuntimeException(e.msg); 208 } 209 return typeid(_cause) == typeid(CancellationException); 210 } 211 return false; 212 } 213 214 bool isDone() { 215 return _isCompleted; 216 } 217 218 T get() { 219 return get(-1.msecs); 220 } 221 222 T get(Duration timeout) { 223 // waitting for the completion 224 if(!_isCompleted) { 225 _waiterLocker.lock(); 226 scope(exit) { 227 _waiterLocker.unlock(); 228 } 229 230 if(timeout.isNegative()) { 231 version (HUNT_DEBUG) info("Waiting for promise [%s]...", id()); 232 _waiterCondition.wait(); 233 } else { 234 version (HUNT_DEBUG_MORE) { 235 tracef("Waiting for promise [%s] in %s...", id(), timeout); 236 } 237 bool r = _waiterCondition.wait(timeout); 238 if(!r) { 239 version(HUNT_DEBUG_MORE) { 240 warningf("Timeout for promise [%s] in %s... isCompleting=%s", id(), timeout, _isCompleting); 241 } 242 243 if (cas(&_isCompleting, false, true)) { 244 _isCompleted = true; 245 string str = format("Promise [%s] timeout in %s", id(), timeout); 246 _cause = new TimeoutException(str); 247 } else { 248 static if(is(T == void)) { 249 warningf("It seems that a result has got in promise %s.", id()); 250 } else { 251 warningf("It seems that a result has got in promise %s: %s", id(), _result.to!string()); 252 } 253 } 254 } 255 } 256 257 if(_cause is null) { 258 version (HUNT_DEBUG_MORE) tracef("Got a succeeded promise [%s].", id()); 259 } else { 260 version (HUNT_DEBUG) warningf("Got a failed promise [%s]: %s", id(), _cause.msg); 261 } 262 } 263 264 // succeeded 265 if (_cause is null) { 266 static if(is(T == void)) { 267 return; 268 } else { 269 return _result; 270 } 271 } 272 273 CancellationException c = cast(CancellationException) _cause; 274 if (c !is null) { 275 version(HUNT_DEBUG) info("A promise cancelled."); 276 throw c; 277 } 278 279 debug warningf("Get a exception in promise %s: %s", id(), _cause.msg); 280 // version (HUNT_DEBUG) warning(_cause); 281 throw new ExecutionException(_cause); 282 } 283 284 override string toString() { 285 static if(is(T == void)) { 286 return format("FutureCallback@%x{%b, %b, void}", toHash(), _isCompleted, _cause is null); 287 } else { 288 return format("FutureCallback@%x{%b, %b, %s}", toHash(), _isCompleted, _cause is null, _result); 289 } 290 } 291 }