1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.concurrency.IteratingCallback;
13 
14 import hunt.concurrency.Locker;
15 import hunt.util.Common;
16 import hunt.Exceptions;
17 
18 import std.format;
19 
20 
21 /**
22  * This specialized callback implements a pattern that allows
23  * a large job to be broken into smaller tasks using iteration
24  * rather than recursion.
25  * <p>
26  * A typical example is the write of a large content to a socket,
27  * divided in chunks. Chunk C1 is written by thread T1, which
28  * also invokes the callback, which writes chunk C2, which invokes
29  * the callback again, which writes chunk C3, and so forth.
30  * </p>
31  * <p>
32  * The problem with the example is that if the callback thread
33  * is the same that performs the I/O operation, then the process
34  * is recursive and may result in a stack overflow.
35  * To avoid the stack overflow, a thread dispatch must be performed,
36  * causing context switching and cache misses, affecting performance.
37  * </p>
38  * <p>
39  * To avoid this issue, this callback uses an AtomicReference to
40  * record whether success callback has been called during the processing
41  * of a sub task, and if so then the processing iterates rather than
42  * recurring.
43  * </p>
44  * <p>
45  * Subclasses must implement method {@link #process()} where the sub
46  * task is executed and a suitable {@link IteratingCallback.Action} is
47  * returned to this callback to indicate the overall progress of the job.
48  * This callback is passed to the asynchronous execution of each sub
49  * task and a call the {@link #succeeded()} on this callback represents
50  * the completion of the sub task.
51  * </p>
52  */
53 abstract class IteratingCallback : Callback {
54     /**
55      * The internal states of this callback
56      */
57     private enum State {
58         /**
59          * This callback is IDLE, ready to iterate.
60          */
61         IDLE,
62 
63         /**
64          * This callback is iterating calls to {@link #process()} and is dealing with
65          * the returns.  To get into processing state, it much of held the lock state
66          * and set iterating to true.
67          */
68         PROCESSING,
69 
70         /**
71          * Waiting for a schedule callback
72          */
73         PENDING,
74 
75         /**
76          * Called by a schedule callback
77          */
78         CALLED,
79 
80         /**
81          * The overall job has succeeded as indicated by a {@link Action#SUCCEEDED} return
82          * from {@link IteratingCallback#process()}
83          */
84         SUCCEEDED,
85 
86         /**
87          * The overall job has failed as indicated by a call to {@link IteratingCallback#failed(Exception)}
88          */
89         FAILED,
90 
91         /**
92          * This callback has been closed and cannot be reset.
93          */
94         CLOSED
95     }
96 
97     /**
98      * The indication of the overall progress of the overall job that
99      * implementations of {@link #process()} must return.
100      */
101     protected enum Action {
102         /**
103          * Indicates that {@link #process()} has no more work to do,
104          * but the overall job is not completed yet, probably waiting
105          * for additional events to trigger more work.
106          */
107         IDLE,
108         /**
109          * Indicates that {@link #process()} is executing asynchronously
110          * a sub task, where the execution has started but the callback
111          * may have not yet been invoked.
112          */
113         SCHEDULED,
114 
115         /**
116          * Indicates that {@link #process()} has completed the overall job.
117          */
118         SUCCEEDED
119     }
120 
121     private Locker _locker; // = new Locker();
122     private State _state;
123     private bool _iterate;
124 
125 
126     protected this() {
127         _locker = new Locker();
128         _state = State.IDLE;
129     }
130 
131     protected this(bool needReset) {
132         _state = needReset ? State.SUCCEEDED : State.IDLE;
133     }
134 
135     /**
136      * Method called by {@link #iterate()} to process the sub task.
137      * <p>
138      * Implementations must start the asynchronous execution of the sub task
139      * (if any) and return an appropriate action:
140      * </p>
141      * <ul>
142      * <li>{@link Action#IDLE} when no sub tasks are available for execution
143      * but the overall job is not completed yet</li>
144      * <li>{@link Action#SCHEDULED} when the sub task asynchronous execution
145      * has been started</li>
146      * <li>{@link Action#SUCCEEDED} when the overall job is completed</li>
147      * </ul>
148      *
149      * @return the appropriate Action
150      * @throws Exception if the sub task processing throws
151      */
152     protected abstract Action process();
153 
154     /**
155      * Invoked when the overall task has completed successfully.
156      *
157      * @see #onCompleteFailure(Exception)
158      */
159     protected void onCompleteSuccess() {
160     }
161 
162     /**
163      * Invoked when the overall task has completed with a failure.
164      *
165      * @param cause the throwable to indicate cause of failure
166      * @see #onCompleteSuccess()
167      */
168     protected void onCompleteFailure(Exception cause) {
169     }
170 
171     /**
172      * This method must be invoked by applications to start the processing
173      * of sub tasks.  It can be called at any time by any thread, and it's
174      * contract is that when called, then the {@link #process()} method will
175      * be called during or soon after, either by the calling thread or by
176      * another thread.
177      */
178     void iterate() {
179         bool process = false;
180         bool canLoop = true;
181 
182         while(canLoop){
183                 // try {
184                     // Locker.Lock lock = _locker.lock();
185                     switch (_state) {
186                         case State.PENDING:
187                         case State.CALLED:
188                             // process will be called when callback is handleds
189                             canLoop = false;
190                             break;
191 
192                         case State.IDLE:
193                             _state = State.PROCESSING;
194                             process = true;
195                             canLoop = false;
196                             break;
197 
198                         case State.PROCESSING:
199                             _iterate = true;
200                             canLoop = false;
201                             break;
202 
203                         case State.FAILED:
204                         case State.SUCCEEDED:
205                             canLoop = false;
206                             break;
207 
208                         case State.CLOSED:
209                         default:
210                             canLoop = false;
211                             throw new IllegalStateException(toString());
212                     }
213                 // } catch(Exception) {}
214             }
215             
216         if (process)
217             processing();
218     }
219 
220     private void processing() {
221         // This should only ever be called when in processing state, however a failed or close call
222         // may happen concurrently, so state is not assumed.
223 
224         bool on_complete_success = false;
225 
226         // While we are processing
227         processing:
228         while (true) {
229             // Call process to get the action that we have to take.
230             Action action;
231             try {
232                 action = process();
233             } catch (Exception x) {
234                 failed(x);
235                 break;
236             }
237 
238             // acted on the action we have just received
239             // try {
240                 Locker.Lock lock = _locker.lock();
241                 switch (_state) {
242                     case State.PROCESSING: {
243                         switch (action) {
244                             case Action.IDLE: {
245                                 // Has iterate been called while we were processing?
246                                 if (_iterate) {
247                                     // yes, so skip idle and keep processing
248                                     _iterate = false;
249                                     _state = State.PROCESSING;
250                                     continue processing;
251                                 }
252 
253                                 // No, so we can go idle
254                                 _state = State.IDLE;
255                                 break processing;
256                             }
257 
258                             case Action.SCHEDULED: {
259                                 // we won the race against the callback, so the callback has to process and we can break processing
260                                 _state = State.PENDING;
261                                 break processing;
262                             }
263 
264                             case Action.SUCCEEDED: {
265                                 // we lost the race against the callback,
266                                 _iterate = false;
267                                 _state = State.SUCCEEDED;
268                                 on_complete_success = true;
269                                 break processing;
270                             }
271 
272                             default:
273                                 throw new IllegalStateException(format("%s[action=%s]", this, action));
274                         }
275                     }
276 
277                     case State.CALLED: {
278                         switch (action) {
279                             case Action.SCHEDULED: {
280                                 // we lost the race, so we have to keep processing
281                                 _state = State.PROCESSING;
282                                 continue processing;
283                             }
284 
285                             default:
286                                 throw new IllegalStateException(format("%s[action=%s]", this, action));
287                         }
288                     }
289 
290                     case State.SUCCEEDED:
291                     case State.FAILED:
292                     case State.CLOSED:
293                         break processing;
294 
295                     case State.IDLE:
296                     case State.PENDING:
297                     default:
298                         throw new IllegalStateException(format("%s[action=%s]", this, action));
299                 }
300             // }catch(Exception) {}
301         }
302 
303         if (on_complete_success)
304             onCompleteSuccess();
305     }
306 
307     /**
308      * Invoked when the sub task succeeds.
309      * Subclasses that override this method must always remember to call
310      * {@code super.succeeded()}.
311      */
312     override
313     void succeeded() {
314         bool process = false;
315         // try {
316         //     Locker.Lock lock = _locker.lock();
317             switch (_state) {
318                 case State.PROCESSING: {
319                     _state = State.CALLED;
320                     break;
321                 }
322                 case State.PENDING: {
323                     _state = State.PROCESSING;
324                     process = true;
325                     break;
326                 }
327                 case State.CLOSED:
328                 case State.FAILED: {
329                     // Too late!
330                     break;
331                 }
332                 default: {
333                     throw new IllegalStateException(toString());
334                 }
335             }
336         // }catch(Exception) {}
337 
338         if (process)
339             processing();
340     }
341 
342     /**
343      * Invoked when the sub task fails.
344      * Subclasses that override this method must always remember to call
345      * {@code super.failed(Exception)}.
346      */
347     override
348     void failed(Exception x) {
349         bool failure = false;
350         // try {
351         //     Locker.Lock lock = _locker.lock();
352             switch (_state) {
353                 case State.SUCCEEDED:
354                 case State.FAILED:
355                 case State.IDLE:
356                 case State.CLOSED:
357                 case State.CALLED:
358                     // too late!.
359                     break;
360 
361                 case State.PENDING:
362                 case State.PROCESSING: {
363                     _state = State.FAILED;
364                     failure = true;
365                     break;
366                 }
367                 default:
368                     throw new IllegalStateException(toString());
369             }
370         // } catch(Exception) {}
371 
372         if (failure)
373             onCompleteFailure(x);
374     }
375 
376     bool isNonBlocking() {
377         return false;
378     }
379 
380     void close() {
381         bool failure = false;
382         // try {
383         //     Locker.Lock lock = _locker.lock();
384             switch (_state) {
385                 case State.IDLE:
386                 case State.SUCCEEDED:
387                 case State.FAILED:
388                     _state = State.CLOSED;
389                     break;
390 
391                 case State.CLOSED:
392                     break;
393 
394                 default:
395                     _state = State.CLOSED;
396                     failure = true;
397             }
398         // }catch(Exception) {}
399 
400         if (failure)
401             onCompleteFailure(new ClosedChannelException(""));
402     }
403 
404     /*
405      * only for testing
406      * @return whether this callback is idle and {@link #iterate()} needs to be called
407      */
408     bool isIdle() {
409         // try {
410         //     Locker.Lock lock = _locker.lock();
411         //     return _state == State.IDLE;
412         // }
413         // catch(Exception) {}
414         return _state == State.IDLE;
415     }
416 
417     bool isClosed() {
418         // try {
419         //     Locker.Lock lock = _locker.lock();
420         //     return _state == State.CLOSED;
421         // }
422         // catch(Exception) {}
423         return _state == State.CLOSED;
424     }
425 
426     /**
427      * @return whether this callback has failed
428      */
429     bool isFailed() {
430         // try {
431         //     Locker.Lock lock = _locker.lock();
432         //     return _state == State.FAILED;
433         // }
434         // catch(Exception) {}
435 
436         return _state == State.FAILED;
437     }
438 
439     /**
440      * @return whether this callback has succeeded
441      */
442     bool isSucceeded() {
443         // try {
444         //     Locker.Lock lock = _locker.lock();
445         //     return _state == State.SUCCEEDED;
446         // }
447         // catch(Exception) {}
448 
449         return _state == State.SUCCEEDED;
450     }
451 
452     /**
453      * Resets this callback.
454      * <p>
455      * A callback can only be reset to IDLE from the
456      * SUCCEEDED or FAILED states or if it is already IDLE.
457      * </p>
458      *
459      * @return true if the reset was successful
460      */
461     bool reset() {
462         // try {
463             // Locker.Lock lock = _locker.lock();
464             switch (_state) {
465                 case State.IDLE:
466                     return true;
467 
468                 case State.SUCCEEDED:
469                 case State.FAILED:
470                     _iterate = false;
471                     _state = State.IDLE;
472                     return true;
473 
474                 default:
475                     return false;
476             }
477         // }
478         // catch(Exception) {}
479     }
480 
481     override
482     string toString() {
483         return format("%s[%s]", super.toString(), _state);
484     }
485 }