1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.concurrency.FuturePromise;
13 
14 import hunt.concurrency.Future;
15 import hunt.concurrency.Promise;
16 
17 import hunt.Exceptions;
18 import hunt.logging.ConsoleLogger;
19 
20 import core.atomic;
21 import core.thread;
22 import core.sync.mutex;
23 import core.sync.condition;
24 
25 import std.format;
26 import std.datetime;
27 
28 /**
29  * 
30  */
31 class FuturePromise(T) : Future!T, Promise!T {
32 	private shared bool _isCompleting = false;
33 	private bool _isCompleted = false;
34 	private Exception _cause;
35 	private string _id;
36 	private Mutex _waiterLocker;
37 	private Condition _waiterCondition;
38 
39 	this() {
40 		_waiterLocker = new Mutex(this);
41 		_waiterCondition = new Condition(_waiterLocker);
42 	}
43 
44 	string id() {
45 		return _id;
46 	}
47 
48 	void id(string id) {
49 		_id = id;
50 	}
51 
52 static if(is(T == void)) {
53 	
54 	/**
55 	 * TODO: 
56 	 * 	1) keep this operation atomic
57 	 * 	2) return a flag to indicate whether this option is successful.
58 	 */
59 	void succeeded() {
60 		if (cas(&_isCompleting, false, true)) {
61 			// _cause = COMPLETED;
62 			onCompleted();
63 		} else {
64 			warningf("This promise has been done, and can't be set again. cause: %s", 
65 				typeid(_cause));
66 		}
67 	}
68 
69 } else {
70 
71 	/**
72 	 * TODO: 
73 	 * 	1) keep this operation atomic
74 	 * 	2) return a flag to indicate whether this option is successful.
75 	 */
76 	void succeeded(T result) {
77 		if (cas(&_isCompleting, false, true)) {
78 			_result = result;
79 			onCompleted();
80 		} else {
81 			warning("This promise has been done, and can't be set again.");
82 		}
83 	}
84 	private T _result;
85 }
86 
87 	/**
88 	 * TODO: 
89 	 * 	1) keep this operation atomic
90 	 * 	2) return a flag to indicate whether this option is successful.
91 	 */
92 	void failed(Exception cause) {
93 		if (cas(&_isCompleting, false, true)) {
94 			_cause = cause;	
95 			onCompleted();		
96 		} else {
97 			warningf("This promise has been done, and can't be set again. cause: %s", 
98 				typeid(_cause));
99 		}
100 	}
101 
102 	bool cancel(bool mayInterruptIfRunning) {
103 		if (cas(&_isCompleting, false, true)) {
104 			static if(!is(T == void)) {
105 				_result = T.init;
106 			}
107 			_cause = new CancellationException("");
108 			onCompleted();
109 			return true;
110 		}
111 		return false;
112 	}
113 
114 	private void onCompleted() {
115 		_waiterLocker.lock();
116 		_isCompleted = true;
117 		scope(exit) {
118 			_waiterLocker.unlock();
119 		}
120 		_waiterCondition.notifyAll();
121 	}
122 
123 	bool isCancelled() {
124 		if (_isCompleted) {
125 			try {
126 				// _latch.await();
127 				// TODO: Tasks pending completion -@zhangxueping at 2019-12-26T15:18:42+08:00
128 				// 
129 			} catch (InterruptedException e) {
130 				throw new RuntimeException(e.msg);
131 			}
132 			return typeid(_cause) == typeid(CancellationException);
133 		}
134 		return false;
135 	}
136 
137 	bool isDone() {
138 		return _isCompleted;
139 	}
140 
141 	T get() {
142 		return get(-1.msecs);
143 	}
144 
145 	T get(Duration timeout) {
146 		// waitting for the completion
147 		if(!_isCompleted) {
148 			_waiterLocker.lock();
149 			scope(exit) {
150 				_waiterLocker.unlock();
151 			}
152 
153 			if(timeout.isNegative()) {
154 				version (HUNT_DEBUG) info("Waiting for a promise...");
155 				_waiterCondition.wait();
156 			} else {
157 				version (HUNT_DEBUG) {
158 					infof("Waiting for a promise in %s...", timeout);
159 				}
160 				bool r = _waiterCondition.wait(timeout);
161 				if(!r) {
162 					debug warningf("Timeout for a promise in %s...", timeout);
163 					if (cas(&_isCompleting, false, true)) {
164 						_isCompleted = true;
165 						_cause = new TimeoutException("Timeout in " ~ timeout.toString());
166 					}
167 				}
168 			}
169 			
170 			if(_cause is null) {
171 				version (HUNT_DEBUG) infof("Got a succeeded promise.");
172 			} else {
173 				version (HUNT_DEBUG) warningf("Got a failed promise: %s", typeid(_cause));
174 			}
175 		} 
176 
177 		// succeeded
178 		if (_cause is null) {
179 			static if(is(T == void)) {
180 				return;
181 			} else {
182 				return _result;
183 			}
184 		}
185 
186 		CancellationException c = cast(CancellationException) _cause;
187 		if (c !is null) {
188 			version(HUNT_DEBUG) info("A promise cancelled.");
189 			throw c;
190 		}
191 		
192 		debug warning("Get a exception in a promise: ", _cause.msg);
193 		version (HUNT_DEBUG) warning(_cause);
194 		throw new ExecutionException(_cause);
195 	}	
196 
197 	override string toString() {
198 		static if(is(T == void)) {
199 			return format("FutureCallback@%x{%b, %b, void}", toHash(), _isCompleted, _cause is null);
200 		} else {
201 			return format("FutureCallback@%x{%b, %b, %s}", toHash(), _isCompleted, _cause is null, _result);
202 		}
203 	}
204 }