1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.concurrency.FuturePromise; 13 14 import hunt.concurrency.Future; 15 import hunt.concurrency.Promise; 16 17 import hunt.Exceptions; 18 import hunt.logging.ConsoleLogger; 19 20 import core.atomic; 21 import core.thread; 22 import core.sync.mutex; 23 import core.sync.condition; 24 25 import std.format; 26 import std.datetime; 27 28 /** 29 * 30 */ 31 class FuturePromise(T) : Future!T, Promise!T { 32 private shared bool _isCompleting = false; 33 private bool _isCompleted = false; 34 private Exception _cause; 35 private string _id; 36 private Mutex _waiterLocker; 37 private Condition _waiterCondition; 38 39 this() { 40 _waiterLocker = new Mutex(this); 41 _waiterCondition = new Condition(_waiterLocker); 42 } 43 44 string id() { 45 return _id; 46 } 47 48 void id(string id) { 49 _id = id; 50 } 51 52 static if(is(T == void)) { 53 54 /** 55 * TODO: 56 * 1) keep this operation atomic 57 * 2) return a flag to indicate whether this option is successful. 58 */ 59 void succeeded() { 60 if (cas(&_isCompleting, false, true)) { 61 // _cause = COMPLETED; 62 onCompleted(); 63 } else { 64 warningf("This promise has been done, and can't be set again. cause: %s", 65 typeid(_cause)); 66 } 67 } 68 69 } else { 70 71 /** 72 * TODO: 73 * 1) keep this operation atomic 74 * 2) return a flag to indicate whether this option is successful. 75 */ 76 void succeeded(T result) { 77 if (cas(&_isCompleting, false, true)) { 78 _result = result; 79 onCompleted(); 80 } else { 81 warning("This promise has been done, and can't be set again."); 82 } 83 } 84 private T _result; 85 } 86 87 /** 88 * TODO: 89 * 1) keep this operation atomic 90 * 2) return a flag to indicate whether this option is successful. 91 */ 92 void failed(Exception cause) { 93 if (cas(&_isCompleting, false, true)) { 94 _cause = cause; 95 onCompleted(); 96 } else { 97 warningf("This promise has been done, and can't be set again. cause: %s", 98 typeid(_cause)); 99 } 100 } 101 102 bool cancel(bool mayInterruptIfRunning) { 103 if (cas(&_isCompleting, false, true)) { 104 static if(!is(T == void)) { 105 _result = T.init; 106 } 107 _cause = new CancellationException(""); 108 onCompleted(); 109 return true; 110 } 111 return false; 112 } 113 114 private void onCompleted() { 115 _waiterLocker.lock(); 116 _isCompleted = true; 117 scope(exit) { 118 _waiterLocker.unlock(); 119 } 120 _waiterCondition.notifyAll(); 121 } 122 123 bool isCancelled() { 124 if (_isCompleted) { 125 try { 126 // _latch.await(); 127 // TODO: Tasks pending completion -@zhangxueping at 2019-12-26T15:18:42+08:00 128 // 129 } catch (InterruptedException e) { 130 throw new RuntimeException(e.msg); 131 } 132 return typeid(_cause) == typeid(CancellationException); 133 } 134 return false; 135 } 136 137 bool isDone() { 138 return _isCompleted; 139 } 140 141 T get() { 142 return get(-1.msecs); 143 } 144 145 T get(Duration timeout) { 146 // waitting for the completion 147 if(!_isCompleted) { 148 _waiterLocker.lock(); 149 scope(exit) { 150 _waiterLocker.unlock(); 151 } 152 153 if(timeout.isNegative()) { 154 version (HUNT_DEBUG) info("Waiting for a promise..."); 155 _waiterCondition.wait(); 156 } else { 157 version (HUNT_DEBUG) { 158 infof("Waiting for a promise in %s...", timeout); 159 } 160 bool r = _waiterCondition.wait(timeout); 161 if(!r) { 162 debug warningf("Timeout for a promise in %s...", timeout); 163 if (cas(&_isCompleting, false, true)) { 164 _isCompleted = true; 165 _cause = new TimeoutException("Timeout in " ~ timeout.toString()); 166 } 167 } 168 } 169 170 if(_cause is null) { 171 version (HUNT_DEBUG) infof("Got a succeeded promise."); 172 } else { 173 version (HUNT_DEBUG) warningf("Got a failed promise: %s", typeid(_cause)); 174 } 175 } 176 177 // succeeded 178 if (_cause is null) { 179 static if(is(T == void)) { 180 return; 181 } else { 182 return _result; 183 } 184 } 185 186 CancellationException c = cast(CancellationException) _cause; 187 if (c !is null) { 188 version(HUNT_DEBUG) info("A promise cancelled."); 189 throw c; 190 } 191 192 debug warning("Get a exception in a promise: ", _cause.msg); 193 version (HUNT_DEBUG) warning(_cause); 194 throw new ExecutionException(_cause); 195 } 196 197 override string toString() { 198 static if(is(T == void)) { 199 return format("FutureCallback@%x{%b, %b, void}", toHash(), _isCompleted, _cause is null); 200 } else { 201 return format("FutureCallback@%x{%b, %b, %s}", toHash(), _isCompleted, _cause is null, _result); 202 } 203 } 204 }