1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.concurrency.FuturePromise;
13 
14 import hunt.concurrency.Future;
15 import hunt.concurrency.Promise;
16 
17 import hunt.Exceptions;
18 import hunt.logging;
19 
20 import core.atomic;
21 import core.thread;
22 import core.sync.mutex;
23 import core.sync.condition;
24 
25 import std.conv;
26 import std.format;
27 import std.datetime;
28 
29 alias ThenHandler(T) = void delegate(T);
30 
31 /**
32  * 
33  */
34 class FuturePromise(T) : Future!T, Promise!T {
35 	alias VoidHandler = void delegate();
36 	
37 	private shared bool _isCompleting = false;
38 	private bool _isCompleted = false;
39 	private Throwable _cause;
40 	private string _id;
41 	private Mutex _waiterLocker;
42 	private Condition _waiterCondition;
43 
44 	this(string id="") {
45 		_id = id;
46 		_waiterLocker = new Mutex(this);
47 		_waiterCondition = new Condition(_waiterLocker);
48 	}
49 
50 	string id() {
51 		return _id;
52 	}
53 
54 	// void id(string id) {
55 	// 	_id = id;
56 	// }
57 
58 	ThenHandler!(Throwable) _thenFailedHandler;
59 
60 static if(is(T == void)) {
61 	VoidHandler _thenSucceededHandler;
62 
63 	FuturePromise!R then(R)(R delegate() handler) {
64 		FuturePromise!R result = new FuturePromise!(R);
65 		_thenSucceededHandler = () {
66 			try {
67 				R r = handler();
68 				result.succeeded(r);
69 			} catch(Exception ex) {
70 				Exception e = new Exception("then exception", ex);
71 				result.failed(e);
72 			}
73 		};
74 
75 		_thenFailedHandler = (Throwable ex) {
76 			Exception e = new Exception("then exception", ex);
77 			result.failed(e);
78 		};
79 
80 		return result;
81 	}
82 
83 	/**
84 	 * TODO: 
85 	 * 	1) keep this operation atomic
86 	 * 	2) return a flag to indicate whether this option is successful.
87 	 */
88 	bool succeeded() {
89 		if (cas(&_isCompleting, false, true)) {
90 			onCompleted();
91 			return true;
92 		} else {
93 			warningf("This promise has completed, and can't be set again. cause: %s", 
94 				_cause is null ? "null" : _cause.msg);
95 			return false;
96 		}
97 	}
98 
99 } else {
100 	ThenHandler!(T) _thenSucceededHandler;
101 
102 	FuturePromise!R then(R)(R delegate(T) handler) {
103 		FuturePromise!R result = new FuturePromise!(R);
104 		_thenSucceededHandler = (T t) {
105 			try {
106 				static if(is(R == void)) {
107 					handler(t);
108 					result.succeeded();
109 				} else {
110 					R r = handler(t);
111 					result.succeeded(r);
112 				}
113 			} catch(Exception ex) {
114 				Exception e = new Exception("then exception", ex);
115 				result.failed(e);
116 			}
117 		};
118 
119 		_thenFailedHandler = (Throwable ex) {
120 			Exception e = new Exception("then exception", ex);
121 			result.failed(e);
122 		};
123 
124 		return result;
125 	}
126 
127 	/**
128 	 * 
129 	 */
130 	bool succeeded(T result) {
131 		version(HUNT_DEBUG_MORE) tracef("completing promise [%s]", id());
132 
133 		if (cas(&_isCompleting, false, true)) {
134 			_result = result;
135 			onCompleted();
136 			return true;
137 		} else {
138 			warningf("The promise [%s] has completed, and can't be set again. Result: %s, Cause: %s", id(), 
139 				_result.to!string(),  _cause is null ? "null" : _cause.msg);
140 			return false;
141 		}
142 	}
143 	private T _result;
144 }
145 
146 	/**
147 	 * 
148 	 */
149 	bool failed(Throwable cause) {
150 		if (cas(&_isCompleting, false, true)) {
151 			_cause = cause;	
152 			onCompleted();	
153 			return true;	
154 		} else {
155 			warningf("This promise [%s] has been done, and can't be set again. cause: %s", 
156 				id(), (cause is null) ? "null" : cause.msg);
157 			return false;
158 		}
159 	}
160 
161 	bool cancel(bool mayInterruptIfRunning) {
162 		if (cas(&_isCompleting, false, true)) {
163 			static if(!is(T == void)) {
164 				_result = T.init;
165 			}
166 			_cause = new CancellationException("");
167 			onCompleted();
168 			return true;
169 		}
170 		return false;
171 	}
172 
173 	private void onCompleted() {
174 
175 		version(HUNT_DEBUG_MORE) tracef("Promise [%s] completed", id());
176 
177 		_waiterLocker.lock();
178 		scope(exit) {
179 			_waiterLocker.unlock();
180 		}
181 		_isCompleted = true;
182 		
183 		_waiterCondition.notifyAll();
184 
185 		if(_cause is null) {
186 			if(_thenSucceededHandler !is null) {
187 				static if(is(T == void)) {
188 					_thenSucceededHandler();
189 				} else {
190 					_thenSucceededHandler(_result);
191 				}
192 			}
193 		} else {
194 			if(_thenFailedHandler !is null) {
195 				_thenFailedHandler(_cause);
196 			}
197 		}
198 	}
199 
200 	bool isCancelled() {
201 		if (_isCompleted) {
202 			try {
203 				// _latch.await();
204 				// TODO: Tasks pending completion -@zhangxueping at 2019-12-26T15:18:42+08:00
205 				// 
206 			} catch (InterruptedException e) {
207 				throw new RuntimeException(e.msg);
208 			}
209 			return typeid(_cause) == typeid(CancellationException);
210 		}
211 		return false;
212 	}
213 
214 	bool isDone() {
215 		return _isCompleted;
216 	}
217 
218 	T get() {
219 		return get(-1.msecs);
220 	}
221 
222 	T get(Duration timeout) {
223 		// waitting for the completion
224 		if(!_isCompleted) {
225 			_waiterLocker.lock();
226 			scope(exit) {
227 				_waiterLocker.unlock();
228 			}
229 
230 			if(timeout.isNegative()) {
231 				version (HUNT_DEBUG) info("Waiting for promise [%s]...", id());
232 				_waiterCondition.wait();
233 			} else {
234 				version (HUNT_DEBUG_MORE) {
235 					tracef("Waiting for promise [%s] in %s...", id(), timeout);
236 				}
237 				bool r = _waiterCondition.wait(timeout);
238 				if(!r) {
239 					version(HUNT_DEBUG_MORE) {
240 						warningf("Timeout for promise [%s] in %s...  isCompleting=%s", id(), timeout, _isCompleting);
241 					}
242 
243 					if (cas(&_isCompleting, false, true)) {
244 						_isCompleted = true;
245 						string str = format("Promise [%s] timeout in %s", id(), timeout);
246 						_cause = new TimeoutException(str);
247 					} else {
248 						static if(is(T == void)) {
249 							warningf("It seems that a result has got in promise %s.", id());
250 						} else {
251 							warningf("It seems that a result has got in promise %s: %s", id(), _result.to!string());
252 						}
253 					}
254 				}
255 			}
256 			
257 			if(_cause is null) {
258 				version (HUNT_DEBUG_MORE) tracef("Got a succeeded promise [%s].", id());
259 			} else {
260 				version (HUNT_DEBUG) warningf("Got a failed promise [%s]: %s", id(), _cause.msg);
261 			}
262 		} 
263 
264 		// succeeded
265 		if (_cause is null) {
266 			static if(is(T == void)) {
267 				return;
268 			} else {
269 				return _result;
270 			}
271 		}
272 
273 		CancellationException c = cast(CancellationException) _cause;
274 		if (c !is null) {
275 			version(HUNT_DEBUG) info("A promise cancelled.");
276 			throw c;
277 		}
278 		
279 		debug warningf("Get a exception in promise %s: %s", id(), _cause.msg);
280 		// version (HUNT_DEBUG) warning(_cause);
281 		throw new ExecutionException(_cause);
282 	}	
283 
284 	override string toString() {
285 		static if(is(T == void)) {
286 			return format("FutureCallback@%x{%b, %b, void}", toHash(), _isCompleted, _cause is null);
287 		} else {
288 			return format("FutureCallback@%x{%b, %b, %s}", toHash(), _isCompleted, _cause is null, _result);
289 		}
290 	}
291 }