1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.concurrency.AbstractExecutorService;
13 
14 import hunt.concurrency.ExecutorService;
15 import hunt.concurrency.Future;
16 import hunt.concurrency.FutureTask;
17 
18 import hunt.collection.ArrayList;
19 import hunt.collection.Collection;
20 import hunt.collection.Iterator;
21 import hunt.collection.List;
22 
23 import hunt.Exceptions;
24 import hunt.logging.ConsoleLogger;
25 import hunt.util.Common;
26 import hunt.util.DateTime;
27 import hunt.util.Runnable;
28 
29 import std.datetime;
30 
31 
32 /**
33  * Provides default implementations of {@link ExecutorService}
34  * execution methods. This class implements the {@code submit},
35  * {@code invokeAny} and {@code invokeAll} methods using a
36  * {@link RunnableFuture} returned by {@code newTaskFor}, which defaults
37  * to the {@link FutureTask} class provided in this package.  For example,
38  * the implementation of {@code submit(Runnable)} creates an
39  * associated {@code RunnableFuture} that is executed and
40  * returned. Subclasses may override the {@code newTaskFor} methods
41  * to return {@code RunnableFuture} implementations other than
42  * {@code FutureTask}.
43  *
44  * <p><b>Extension example</b>. Here is a sketch of a class
45  * that customizes {@link ThreadPoolExecutor} to use
46  * a {@code CustomTask} class instead of the default {@code FutureTask}:
47  * <pre> {@code
48  * class CustomThreadPoolExecutor extends ThreadPoolExecutor {
49  *
50  *   static class CustomTask!(V) : RunnableFuture!(V) {...}
51  *
52  *   protected !(V) RunnableFuture!(V) newTaskFor(Callable!(V) c) {
53  *       return new CustomTask!(V)(c);
54  *   }
55  *   protected !(V) RunnableFuture!(V) newTaskFor(Runnable r, V v) {
56  *       return new CustomTask!(V)(r, v);
57  *   }
58  *   // ... add constructors, etc.
59  * }}</pre>
60  *
61  * @author Doug Lea
62  */
63 abstract class AbstractExecutorService : ExecutorService {
64 
65     /**
66      * Returns a {@code RunnableFuture} for the given runnable and default
67      * value.
68      *
69      * @param runnable the runnable task being wrapped
70      * @param value the default value for the returned future
71      * @param (T) the type of the given value
72      * @return a {@code RunnableFuture} which, when run, will run the
73      * underlying runnable and which, as a {@code Future}, will yield
74      * the given value as its result and provide for cancellation of
75      * the underlying task
76      */
77     static RunnableFuture!(T) newTaskFor(T)(Runnable runnable, T value) if(!is(T == void)) {
78         return new FutureTask!(T)(runnable, value);
79     }
80 
81     static RunnableFuture!(T) newTaskFor(T)(Runnable runnable) if(is(T == void)) {
82         return new FutureTask!(T)(runnable);
83     }
84 
85     /**
86      * Returns a {@code RunnableFuture} for the given callable task.
87      *
88      * @param callable the callable task being wrapped
89      * @param (T) the type of the callable's result
90      * @return a {@code RunnableFuture} which, when run, will call the
91      * underlying callable and which, as a {@code Future}, will yield
92      * the callable's result as its result and provide for
93      * cancellation of the underlying task
94      */
95     static RunnableFuture!(T) newTaskFor(T)(Callable!(T) callable) {
96         return new FutureTask!(T)(callable);
97     }
98 
99     /**
100      * @throws RejectedExecutionException {@inheritDoc}
101      * @throws NullPointerException       {@inheritDoc}
102      */
103     Future!(void) submit(Runnable task) {
104         if (task is null) throw new NullPointerException();
105         // RunnableFuture!(void) ftask = new FutureTask!(void)(task);
106         RunnableFuture!(void) ftask = newTaskFor!(void)(task);
107         execute(ftask);
108         return ftask;
109     }
110 
111     /**
112      * @throws RejectedExecutionException {@inheritDoc}
113      * @throws NullPointerException       {@inheritDoc}
114      */
115     Future!(T) submit(T)(Runnable task, T result) {
116         if (task is null) throw new NullPointerException();
117         RunnableFuture!(T) ftask = newTaskFor!(T)(task, result);
118         execute(ftask);
119         return ftask;
120     }
121 
122     /**
123      * @throws RejectedExecutionException {@inheritDoc}
124      * @throws NullPointerException       {@inheritDoc}
125      */
126     Future!(T) submit(T)(Callable!(T) task) {
127         if (task is null) throw new NullPointerException();
128         RunnableFuture!(T) ftask = newTaskFor(task);
129         execute(ftask);
130         return ftask;
131     }
132 
133     /**
134      * the main mechanics of invokeAny.
135      */
136     private T doInvokeAny(T)(Collection!(Callable!(T)) tasks,
137                               bool timed, long nanos) {
138         if (tasks is null)
139             throw new NullPointerException();
140         int ntasks = tasks.size();
141         if (ntasks == 0)
142             throw new IllegalArgumentException();
143         ArrayList!(Future!(T)) futures = new ArrayList!(Future!(T))(ntasks);
144         ExecutorCompletionService!(T) ecs =
145             new ExecutorCompletionService!(T)(this);
146 
147         // For efficiency, especially in executors with limited
148         // parallelism, check to see if previously submitted tasks are
149         // done before submitting more of them. This interleaving
150         // plus the exception mechanics account for messiness of main
151         // loop.
152 
153         try {
154             // Record exceptions so that if we fail to obtain any
155             // result, we can throw the last exception we got.
156             ExecutionException ee = null;
157             long deadline = timed ? Clock.currStdTime() + nanos : 0L;
158             Iterator!(Callable!(T)) it = tasks.iterator();
159 
160             // Start one task for sure; the rest incrementally
161             futures.add(ecs.submit(it.next()));
162             --ntasks;
163             int active = 1;
164 
165             for (;;) {
166                 Future!(T) f = ecs.poll();
167                 if (f is null) {
168                     if (ntasks > 0) {
169                         --ntasks;
170                         futures.add(ecs.submit(it.next()));
171                         ++active;
172                     }
173                     else if (active == 0)
174                         break;
175                     else if (timed) {
176                         f = ecs.poll(nanos, NANOSECONDS);
177                         if (f is null)
178                             throw new TimeoutException();
179                         nanos = deadline - Clock.currStdTime();
180                     }
181                     else
182                         f = ecs.take();
183                 }
184                 if (f !is null) {
185                     --active;
186                     try {
187                         return f.get();
188                     } catch (ExecutionException eex) {
189                         ee = eex;
190                     } catch (RuntimeException rex) {
191                         ee = new ExecutionException(rex);
192                     }
193                 }
194             }
195 
196             if (ee is null)
197                 ee = new ExecutionException();
198             throw ee;
199 
200         } finally {
201             cancelAll(futures);
202         }
203     }
204 
205     T invokeAny(T)(Collection!(Callable!(T)) tasks) {
206         try {
207             return doInvokeAny(tasks, false, 0);
208         } catch (TimeoutException cannotHappen) {
209             assert(false);
210             return null;
211         }
212     }
213 
214     T invokeAny(T)(Collection!(Callable!(T)) tasks,
215                            Duration timeout) {
216         return doInvokeAny(tasks, true, unit.toNanos(timeout));
217     }
218 
219     List!(Future!(T)) invokeAll(T)(Collection!(Callable!(T)) tasks) {
220         if (tasks is null)
221             throw new NullPointerException();
222         ArrayList!(Future!(T)) futures = new ArrayList!(Future!(T))(tasks.size());
223         try {
224             foreach (Callable!(T) t ; tasks) {
225                 RunnableFuture!(T) f = newTaskFor(t);
226                 futures.add(f);
227                 execute(f);
228             }
229             for (int i = 0, size = futures.size(); i < size; i++) {
230                 Future!(T) f = futures.get(i);
231                 if (!f.isDone()) {
232                     try { f.get(); }
233                     catch (CancellationException ex) {
234                         version(HUNT_DEBUG) warning(ex.message());
235                     }
236                     catch (ExecutionException) {
237                         version(HUNT_DEBUG) warning(ex.message());
238                     }
239                 }
240             }
241             return futures;
242         } catch (Throwable t) {
243             cancelAll(futures);
244             throw t;
245         }
246     }
247 
248     List!(Future!(T)) invokeAll(T)(Collection!(Callable!(T)) tasks,
249                                          Duration timeout)  {
250         if (tasks is null)
251             throw new NullPointerException();
252         long nanos = timeout.total!(TimeUnit.HectoNanosecond)();
253         long deadline = Clock.currStdTime + nanos;
254         ArrayList!(Future!(T)) futures = new ArrayList!(Future!(T))(tasks.size());
255         int j = 0;
256 
257         timedOut: 
258         try {
259             foreach (Callable!(T) t ; tasks)
260                 futures.add(newTaskFor(t));
261 
262             final int size = futures.size();
263 
264             // Interleave time checks and calls to execute in case
265             // executor doesn't have any/much parallelism.
266             for (int i = 0; i < size; i++) {
267                 if (((i == 0) ? nanos : deadline - Clock.currStdTime) <= 0L)
268                     break timedOut;
269                 execute(cast(Runnable)futures.get(i));
270             }
271 
272             for (; j < size; j++) {
273                 Future!(T) f = futures.get(j);
274                 if (!f.isDone()) {
275                     try { f.get( Duration(deadline - Clock.currStdTime)); }
276                     catch (CancellationException ex) {
277                         version(HUNT_DEBUG) warning(ex.message());
278                     }
279                     catch (ExecutionException ex) {
280                         version(HUNT_DEBUG) warning(ex.message());
281                     }
282                     catch (TimeoutException ex) {
283                         version(HUNT_DEBUG) warning(ex.message());
284                         break timedOut;
285                     }
286                 }
287             }
288             return futures;
289         } catch (Throwable t) {
290             cancelAll(futures);
291             throw t;
292         }
293         // Timed out before all the tasks could be completed; cancel remaining
294         cancelAll(futures, j);
295         return futures;
296     }
297 
298     private static void cancelAll(T)(ArrayList!(Future!(T)) futures) {
299         cancelAll(futures, 0);
300     }
301 
302     /** Cancels all futures with index at least j. */
303     private static void cancelAll(T)(ArrayList!(Future!(T)) futures, int j) {
304         for (int size = futures.size(); j < size; j++)
305             futures.get(j).cancel(true);
306     }
307 }