1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.concurrency.FuturePromise;
13 
14 import hunt.concurrency.Future;
15 import hunt.concurrency.Promise;
16 
17 import hunt.Exceptions;
18 import hunt.logging.ConsoleLogger;
19 
20 import core.atomic;
21 import core.thread;
22 import core.sync.mutex;
23 import core.sync.condition;
24 
25 import std.format;
26 import std.datetime;
27 
28 alias ThenHandler(T) = void delegate(T);
29 
30 /**
31  * 
32  */
33 class FuturePromise(T) : Future!T, Promise!T {
34 	alias VoidHandler = void delegate();
35 	
36 	private shared bool _isCompleting = false;
37 	private bool _isCompleted = false;
38 	private Exception _cause;
39 	private string _id;
40 	private Mutex _waiterLocker;
41 	private Condition _waiterCondition;
42 
43 	this() {
44 		_waiterLocker = new Mutex(this);
45 		_waiterCondition = new Condition(_waiterLocker);
46 	}
47 
48 	string id() {
49 		return _id;
50 	}
51 
52 	void id(string id) {
53 		_id = id;
54 	}
55 
56 	ThenHandler!(Exception) _thenFailedHandler;
57 
58 static if(is(T == void)) {
59 	VoidHandler _thenSucceededHandler;
60 
61 	FuturePromise!R then(R)(R delegate() handler) {
62 		FuturePromise!R result = new FuturePromise!(R);
63 		_thenSucceededHandler = () {
64 			try {
65 				R r = handler();
66 				result.succeeded(r);
67 			} catch(Exception ex) {
68 				Exception e = new Exception("then exception", ex);
69 				result.failed(e);
70 			}
71 		};
72 
73 		_thenFailedHandler = (Exception ex) {
74 			Exception e = new Exception("then exception", ex);
75 			result.failed(e);
76 		};
77 
78 		return result;
79 	}
80 
81 	/**
82 	 * TODO: 
83 	 * 	1) keep this operation atomic
84 	 * 	2) return a flag to indicate whether this option is successful.
85 	 */
86 	void succeeded() {
87 		if (cas(&_isCompleting, false, true)) {
88 			onCompleted();
89 		} else {
90 			warningf("This promise has been done, and can't be set again. cause: %s", 
91 				typeid(_cause));
92 		}
93 	}
94 
95 } else {
96 	ThenHandler!(T) _thenSucceededHandler;
97 
98 	FuturePromise!R then(R)(R delegate(T) handler) {
99 		FuturePromise!R result = new FuturePromise!(R);
100 		_thenSucceededHandler = (T t) {
101 			try {
102 				static if(is(R == void)) {
103 					handler(t);
104 					result.succeeded();
105 				} else {
106 					R r = handler(t);
107 					result.succeeded(r);
108 				}
109 			} catch(Exception ex) {
110 				Exception e = new Exception("then exception", ex);
111 				result.failed(e);
112 			}
113 		};
114 
115 		_thenFailedHandler = (Exception ex) {
116 			Exception e = new Exception("then exception", ex);
117 			result.failed(e);
118 		};
119 
120 		return result;
121 	}
122 
123 	/**
124 	 * TODO: 
125 	 * 	1) keep this operation atomic
126 	 * 	2) return a flag to indicate whether this option is successful.
127 	 */
128 	void succeeded(T result) {
129 		if (cas(&_isCompleting, false, true)) {
130 			_result = result;
131 			onCompleted();
132 		} else {
133 			warning("This promise has been done, and can't be set again.");
134 		}
135 	}
136 	private T _result;
137 }
138 
139 	/**
140 	 * TODO: 
141 	 * 	1) keep this operation atomic
142 	 * 	2) return a flag to indicate whether this option is successful.
143 	 */
144 	void failed(Exception cause) {
145 		if (cas(&_isCompleting, false, true)) {
146 			_cause = cause;	
147 			onCompleted();		
148 		} else {
149 			warningf("This promise has been done, and can't be set again. cause: %s", 
150 				typeid(_cause));
151 		}
152 	}
153 
154 	bool cancel(bool mayInterruptIfRunning) {
155 		if (cas(&_isCompleting, false, true)) {
156 			static if(!is(T == void)) {
157 				_result = T.init;
158 			}
159 			_cause = new CancellationException("");
160 			onCompleted();
161 			return true;
162 		}
163 		return false;
164 	}
165 
166 	private void onCompleted() {
167 		_waiterLocker.lock();
168 		_isCompleted = true;
169 		scope(exit) {
170 			_waiterLocker.unlock();
171 		}
172 		
173 		_waiterCondition.notifyAll();
174 
175 		if(_cause is null) {
176 			if(_thenSucceededHandler !is null) {
177 				static if(is(T == void)) {
178 					_thenSucceededHandler();
179 				} else {
180 					_thenSucceededHandler(_result);
181 				}
182 			}
183 		} else {
184 			if(_thenFailedHandler !is null) {
185 				_thenFailedHandler(_cause);
186 			}
187 		}
188 	}
189 
190 	bool isCancelled() {
191 		if (_isCompleted) {
192 			try {
193 				// _latch.await();
194 				// TODO: Tasks pending completion -@zhangxueping at 2019-12-26T15:18:42+08:00
195 				// 
196 			} catch (InterruptedException e) {
197 				throw new RuntimeException(e.msg);
198 			}
199 			return typeid(_cause) == typeid(CancellationException);
200 		}
201 		return false;
202 	}
203 
204 	bool isDone() {
205 		return _isCompleted;
206 	}
207 
208 	T get() {
209 		return get(-1.msecs);
210 	}
211 
212 	T get(Duration timeout) {
213 		// waitting for the completion
214 		if(!_isCompleted) {
215 			_waiterLocker.lock();
216 			scope(exit) {
217 				_waiterLocker.unlock();
218 			}
219 
220 			if(timeout.isNegative()) {
221 				version (HUNT_DEBUG) info("Waiting for a promise...");
222 				_waiterCondition.wait();
223 			} else {
224 				version (HUNT_DEBUG) {
225 					infof("Waiting for a promise in %s...", timeout);
226 				}
227 				bool r = _waiterCondition.wait(timeout);
228 				if(!r) {
229 					debug warningf("Timeout for a promise in %s...", timeout);
230 					if (cas(&_isCompleting, false, true)) {
231 						_isCompleted = true;
232 						_cause = new TimeoutException("Timeout in " ~ timeout.toString());
233 					}
234 				}
235 			}
236 			
237 			if(_cause is null) {
238 				version (HUNT_DEBUG) infof("Got a succeeded promise.");
239 			} else {
240 				version (HUNT_DEBUG) warningf("Got a failed promise: %s", typeid(_cause));
241 			}
242 		} 
243 
244 		// succeeded
245 		if (_cause is null) {
246 			static if(is(T == void)) {
247 				return;
248 			} else {
249 				return _result;
250 			}
251 		}
252 
253 		CancellationException c = cast(CancellationException) _cause;
254 		if (c !is null) {
255 			version(HUNT_DEBUG) info("A promise cancelled.");
256 			throw c;
257 		}
258 		
259 		debug warning("Get a exception in a promise: ", _cause.msg);
260 		version (HUNT_DEBUG) warning(_cause);
261 		throw new ExecutionException(_cause);
262 	}	
263 
264 	override string toString() {
265 		static if(is(T == void)) {
266 			return format("FutureCallback@%x{%b, %b, void}", toHash(), _isCompleted, _cause is null);
267 		} else {
268 			return format("FutureCallback@%x{%b, %b, %s}", toHash(), _isCompleted, _cause is null, _result);
269 		}
270 	}
271 }