1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.concurrency.AbstractExecutorService; 13 14 import hunt.concurrency.ExecutorService; 15 import hunt.concurrency.Future; 16 import hunt.concurrency.FutureTask; 17 18 import hunt.collection.ArrayList; 19 import hunt.collection.Collection; 20 import hunt.collection.Iterator; 21 import hunt.collection.List; 22 23 import hunt.Exceptions; 24 import hunt.logging; 25 import hunt.util.Common; 26 import hunt.util.DateTime; 27 import hunt.util.Runnable; 28 29 import std.datetime; 30 31 32 /** 33 * Provides default implementations of {@link ExecutorService} 34 * execution methods. This class implements the {@code submit}, 35 * {@code invokeAny} and {@code invokeAll} methods using a 36 * {@link RunnableFuture} returned by {@code newTaskFor}, which defaults 37 * to the {@link FutureTask} class provided in this package. For example, 38 * the implementation of {@code submit(Runnable)} creates an 39 * associated {@code RunnableFuture} that is executed and 40 * returned. Subclasses may override the {@code newTaskFor} methods 41 * to return {@code RunnableFuture} implementations other than 42 * {@code FutureTask}. 43 * 44 * <p><b>Extension example</b>. Here is a sketch of a class 45 * that customizes {@link ThreadPoolExecutor} to use 46 * a {@code CustomTask} class instead of the default {@code FutureTask}: 47 * <pre> {@code 48 * class CustomThreadPoolExecutor extends ThreadPoolExecutor { 49 * 50 * static class CustomTask!(V) : RunnableFuture!(V) {...} 51 * 52 * protected !(V) RunnableFuture!(V) newTaskFor(Callable!(V) c) { 53 * return new CustomTask!(V)(c); 54 * } 55 * protected !(V) RunnableFuture!(V) newTaskFor(Runnable r, V v) { 56 * return new CustomTask!(V)(r, v); 57 * } 58 * // ... add constructors, etc. 59 * }}</pre> 60 * 61 * @author Doug Lea 62 */ 63 abstract class AbstractExecutorService : ExecutorService { 64 65 /** 66 * Returns a {@code RunnableFuture} for the given runnable and default 67 * value. 68 * 69 * @param runnable the runnable task being wrapped 70 * @param value the default value for the returned future 71 * @param (T) the type of the given value 72 * @return a {@code RunnableFuture} which, when run, will run the 73 * underlying runnable and which, as a {@code Future}, will yield 74 * the given value as its result and provide for cancellation of 75 * the underlying task 76 */ 77 static RunnableFuture!(T) newTaskFor(T)(Runnable runnable, T value) if(!is(T == void)) { 78 return new FutureTask!(T)(runnable, value); 79 } 80 81 static RunnableFuture!(T) newTaskFor(T)(Runnable runnable) if(is(T == void)) { 82 return new FutureTask!(T)(runnable); 83 } 84 85 /** 86 * Returns a {@code RunnableFuture} for the given callable task. 87 * 88 * @param callable the callable task being wrapped 89 * @param (T) the type of the callable's result 90 * @return a {@code RunnableFuture} which, when run, will call the 91 * underlying callable and which, as a {@code Future}, will yield 92 * the callable's result as its result and provide for 93 * cancellation of the underlying task 94 */ 95 static RunnableFuture!(T) newTaskFor(T)(Callable!(T) callable) { 96 return new FutureTask!(T)(callable); 97 } 98 99 /** 100 * @throws RejectedExecutionException {@inheritDoc} 101 * @throws NullPointerException {@inheritDoc} 102 */ 103 Future!(void) submit(Runnable task) { 104 if (task is null) throw new NullPointerException(); 105 // RunnableFuture!(void) ftask = new FutureTask!(void)(task); 106 RunnableFuture!(void) ftask = newTaskFor!(void)(task); 107 execute(ftask); 108 return ftask; 109 } 110 111 /** 112 * @throws RejectedExecutionException {@inheritDoc} 113 * @throws NullPointerException {@inheritDoc} 114 */ 115 Future!(T) submit(T)(Runnable task, T result) { 116 if (task is null) throw new NullPointerException(); 117 RunnableFuture!(T) ftask = newTaskFor!(T)(task, result); 118 execute(ftask); 119 return ftask; 120 } 121 122 /** 123 * @throws RejectedExecutionException {@inheritDoc} 124 * @throws NullPointerException {@inheritDoc} 125 */ 126 Future!(T) submit(T)(Callable!(T) task) { 127 if (task is null) throw new NullPointerException(); 128 RunnableFuture!(T) ftask = newTaskFor(task); 129 execute(ftask); 130 return ftask; 131 } 132 133 /** 134 * the main mechanics of invokeAny. 135 */ 136 private T doInvokeAny(T)(Collection!(Callable!(T)) tasks, 137 bool timed, long nanos) { 138 if (tasks is null) 139 throw new NullPointerException(); 140 int ntasks = tasks.size(); 141 if (ntasks == 0) 142 throw new IllegalArgumentException(); 143 ArrayList!(Future!(T)) futures = new ArrayList!(Future!(T))(ntasks); 144 ExecutorCompletionService!(T) ecs = 145 new ExecutorCompletionService!(T)(this); 146 147 // For efficiency, especially in executors with limited 148 // parallelism, check to see if previously submitted tasks are 149 // done before submitting more of them. This interleaving 150 // plus the exception mechanics account for messiness of main 151 // loop. 152 153 try { 154 // Record exceptions so that if we fail to obtain any 155 // result, we can throw the last exception we got. 156 ExecutionException ee = null; 157 long deadline = timed ? Clock.currStdTime() + nanos : 0L; 158 Iterator!(Callable!(T)) it = tasks.iterator(); 159 160 // Start one task for sure; the rest incrementally 161 futures.add(ecs.submit(it.next())); 162 --ntasks; 163 int active = 1; 164 165 for (;;) { 166 Future!(T) f = ecs.poll(); 167 if (f is null) { 168 if (ntasks > 0) { 169 --ntasks; 170 futures.add(ecs.submit(it.next())); 171 ++active; 172 } 173 else if (active == 0) 174 break; 175 else if (timed) { 176 f = ecs.poll(nanos, NANOSECONDS); 177 if (f is null) 178 throw new TimeoutException(); 179 nanos = deadline - Clock.currStdTime(); 180 } 181 else 182 f = ecs.take(); 183 } 184 if (f !is null) { 185 --active; 186 try { 187 return f.get(); 188 } catch (ExecutionException eex) { 189 ee = eex; 190 } catch (RuntimeException rex) { 191 ee = new ExecutionException(rex); 192 } 193 } 194 } 195 196 if (ee is null) 197 ee = new ExecutionException(); 198 throw ee; 199 200 } finally { 201 cancelAll(futures); 202 } 203 } 204 205 T invokeAny(T)(Collection!(Callable!(T)) tasks) { 206 try { 207 return doInvokeAny(tasks, false, 0); 208 } catch (TimeoutException cannotHappen) { 209 assert(false); 210 return null; 211 } 212 } 213 214 T invokeAny(T)(Collection!(Callable!(T)) tasks, 215 Duration timeout) { 216 return doInvokeAny(tasks, true, unit.toNanos(timeout)); 217 } 218 219 List!(Future!(T)) invokeAll(T)(Collection!(Callable!(T)) tasks) { 220 if (tasks is null) 221 throw new NullPointerException(); 222 ArrayList!(Future!(T)) futures = new ArrayList!(Future!(T))(tasks.size()); 223 try { 224 foreach (Callable!(T) t ; tasks) { 225 RunnableFuture!(T) f = newTaskFor(t); 226 futures.add(f); 227 execute(f); 228 } 229 for (int i = 0, size = futures.size(); i < size; i++) { 230 Future!(T) f = futures.get(i); 231 if (!f.isDone()) { 232 try { f.get(); } 233 catch (CancellationException ex) { 234 version(HUNT_DEBUG) warning(ex.message()); 235 } 236 catch (ExecutionException) { 237 version(HUNT_DEBUG) warning(ex.message()); 238 } 239 } 240 } 241 return futures; 242 } catch (Throwable t) { 243 cancelAll(futures); 244 throw t; 245 } 246 } 247 248 List!(Future!(T)) invokeAll(T)(Collection!(Callable!(T)) tasks, 249 Duration timeout) { 250 if (tasks is null) 251 throw new NullPointerException(); 252 long nanos = timeout.total!(TimeUnit.HectoNanosecond)(); 253 long deadline = Clock.currStdTime + nanos; 254 ArrayList!(Future!(T)) futures = new ArrayList!(Future!(T))(tasks.size()); 255 int j = 0; 256 257 timedOut: 258 try { 259 foreach (Callable!(T) t ; tasks) 260 futures.add(newTaskFor(t)); 261 262 final int size = futures.size(); 263 264 // Interleave time checks and calls to execute in case 265 // executor doesn't have any/much parallelism. 266 for (int i = 0; i < size; i++) { 267 if (((i == 0) ? nanos : deadline - Clock.currStdTime) <= 0L) 268 break timedOut; 269 execute(cast(Runnable)futures.get(i)); 270 } 271 272 for (; j < size; j++) { 273 Future!(T) f = futures.get(j); 274 if (!f.isDone()) { 275 try { f.get( Duration(deadline - Clock.currStdTime)); } 276 catch (CancellationException ex) { 277 version(HUNT_DEBUG) warning(ex.message()); 278 } 279 catch (ExecutionException ex) { 280 version(HUNT_DEBUG) warning(ex.message()); 281 } 282 catch (TimeoutException ex) { 283 version(HUNT_DEBUG) warning(ex.message()); 284 break timedOut; 285 } 286 } 287 } 288 return futures; 289 } catch (Throwable t) { 290 cancelAll(futures); 291 throw t; 292 } 293 // Timed out before all the tasks could be completed; cancel remaining 294 cancelAll(futures, j); 295 return futures; 296 } 297 298 private static void cancelAll(T)(ArrayList!(Future!(T)) futures) { 299 cancelAll(futures, 0); 300 } 301 302 /** Cancels all futures with index at least j. */ 303 private static void cancelAll(T)(ArrayList!(Future!(T)) futures, int j) { 304 for (int size = futures.size(); j < size; j++) 305 futures.get(j).cancel(true); 306 } 307 }