1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.concurrency.Executors; 13 14 import hunt.concurrency.AbstractExecutorService; 15 import hunt.concurrency.atomic.AtomicHelper; 16 import hunt.concurrency.Delayed; 17 import hunt.concurrency.Exceptions; 18 import hunt.concurrency.ExecutorService; 19 // import hunt.concurrency.ForkJoinPool; 20 import hunt.concurrency.Future; 21 import hunt.concurrency.LinkedBlockingQueue; 22 import hunt.concurrency.ScheduledExecutorService; 23 import hunt.concurrency.ScheduledThreadPoolExecutor; 24 import hunt.concurrency.ThreadFactory; 25 import hunt.concurrency.ThreadPoolExecutor; 26 27 import hunt.collection.List; 28 import hunt.Exceptions; 29 import hunt.logging; 30 import hunt.util.Common; 31 import hunt.util.CompilerHelper; 32 import hunt.util.DateTime; 33 import hunt.util.Runnable; 34 35 36 static if(CompilerHelper.isGreaterThan(2093)) { 37 import core.thread.osthread; 38 } else { 39 import core.thread; 40 } 41 42 import core.time; 43 import std.conv; 44 45 /** 46 * Factory and utility methods for {@link Executor}, {@link 47 * ExecutorService}, {@link ScheduledExecutorService}, {@link 48 * ThreadFactory}, and {@link Callable} classes defined in this 49 * package. This class supports the following kinds of methods: 50 * 51 * <ul> 52 * <li>Methods that create and return an {@link ExecutorService} 53 * set up with commonly useful configuration settings. 54 * <li>Methods that create and return a {@link ScheduledExecutorService} 55 * set up with commonly useful configuration settings. 56 * <li>Methods that create and return a "wrapped" ExecutorService, that 57 * disables reconfiguration by making implementation-specific methods 58 * inaccessible. 59 * <li>Methods that create and return a {@link ThreadFactory} 60 * that sets newly created threads to a known state. 61 * <li>Methods that create and return a {@link Callable} 62 * out of other closure-like forms, so they can be used 63 * in execution methods requiring {@code Callable}. 64 * </ul> 65 * 66 * @author Doug Lea 67 */ 68 class Executors { 69 70 /** 71 * Creates a thread pool that reuses a fixed number of threads 72 * operating off a shared unbounded queue. At any point, at most 73 * {@code nThreads} threads will be active processing tasks. 74 * If additional tasks are submitted when all threads are active, 75 * they will wait in the queue until a thread is available. 76 * If any thread terminates due to a failure during execution 77 * prior to shutdown, a new one will take its place if needed to 78 * execute subsequent tasks. The threads in the pool will exist 79 * until it is explicitly {@link ExecutorService#shutdown shutdown}. 80 * 81 * @param nThreads the number of threads in the pool 82 * @return the newly created thread pool 83 * @throws IllegalArgumentException if {@code nThreads <= 0} 84 */ 85 static ThreadPoolExecutor newFixedThreadPool(int nThreads) { 86 return new ThreadPoolExecutor(nThreads, nThreads, 0.hnsecs, 87 new LinkedBlockingQueue!(Runnable)()); 88 } 89 90 // /** 91 // * Creates a thread pool that maintains enough threads to support 92 // * the given parallelism level, and may use multiple queues to 93 // * reduce contention. The parallelism level corresponds to the 94 // * maximum number of threads actively engaged in, or available to 95 // * engage in, task processing. The actual number of threads may 96 // * grow and shrink dynamically. A work-stealing pool makes no 97 // * guarantees about the order in which submitted tasks are 98 // * executed. 99 // * 100 // * @param parallelism the targeted parallelism level 101 // * @return the newly created thread pool 102 // * @throws IllegalArgumentException if {@code parallelism <= 0} 103 // */ 104 // static ExecutorService newWorkStealingPool(int parallelism) { 105 // return new ForkJoinPool 106 // (parallelism, 107 // ForkJoinPool.defaultForkJoinWorkerThreadFactory, 108 // null, true); 109 // } 110 111 // /** 112 // * Creates a work-stealing thread pool using the number of 113 // * {@linkplain Runtime#availableProcessors available processors} 114 // * as its target parallelism level. 115 // * 116 // * @return the newly created thread pool 117 // * @see #newWorkStealingPool(int) 118 // */ 119 // static ExecutorService newWorkStealingPool() { 120 // return new ForkJoinPool 121 // (Runtime.getRuntime().availableProcessors(), 122 // ForkJoinPool.defaultForkJoinWorkerThreadFactory, 123 // null, true); 124 // } 125 126 /** 127 * Creates a thread pool that reuses a fixed number of threads 128 * operating off a shared unbounded queue, using the provided 129 * ThreadFactory to create new threads when needed. At any point, 130 * at most {@code nThreads} threads will be active processing 131 * tasks. If additional tasks are submitted when all threads are 132 * active, they will wait in the queue until a thread is 133 * available. If any thread terminates due to a failure during 134 * execution prior to shutdown, a new one will take its place if 135 * needed to execute subsequent tasks. The threads in the pool will 136 * exist until it is explicitly {@link ExecutorService#shutdown 137 * shutdown}. 138 * 139 * @param nThreads the number of threads in the pool 140 * @param threadFactory the factory to use when creating new threads 141 * @return the newly created thread pool 142 * @throws NullPointerException if threadFactory is null 143 * @throws IllegalArgumentException if {@code nThreads <= 0} 144 */ 145 static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { 146 return new ThreadPoolExecutor(nThreads, nThreads, 0.msecs, 147 new LinkedBlockingQueue!(Runnable)(), 148 threadFactory); 149 } 150 151 /** 152 * Creates an Executor that uses a single worker thread operating 153 * off an unbounded queue. (Note however that if this single 154 * thread terminates due to a failure during execution prior to 155 * shutdown, a new one will take its place if needed to execute 156 * subsequent tasks.) Tasks are guaranteed to execute 157 * sequentially, and no more than one task will be active at any 158 * given time. Unlike the otherwise equivalent 159 * {@code newFixedThreadPool(1)} the returned executor is 160 * guaranteed not to be reconfigurable to use additional threads. 161 * 162 * @return the newly created single-threaded Executor 163 */ 164 // static ExecutorService newSingleThreadExecutor() { 165 // return new FinalizableDelegatedExecutorService 166 // (new ThreadPoolExecutor(1, 1, 167 // 0L, TimeUnit.MILLISECONDS, 168 // new LinkedBlockingQueue!(Runnable)())); 169 // } 170 171 // /** 172 // * Creates an Executor that uses a single worker thread operating 173 // * off an unbounded queue, and uses the provided ThreadFactory to 174 // * create a new thread when needed. Unlike the otherwise 175 // * equivalent {@code newFixedThreadPool(1, threadFactory)} the 176 // * returned executor is guaranteed not to be reconfigurable to use 177 // * additional threads. 178 // * 179 // * @param threadFactory the factory to use when creating new threads 180 // * @return the newly created single-threaded Executor 181 // * @throws NullPointerException if threadFactory is null 182 // */ 183 // static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { 184 // return new FinalizableDelegatedExecutorService 185 // (new ThreadPoolExecutor(1, 1, 186 // 0L, TimeUnit.MILLISECONDS, 187 // new LinkedBlockingQueue!(Runnable)(), 188 // threadFactory)); 189 // } 190 191 /** 192 * Creates a thread pool that creates new threads as needed, but 193 * will reuse previously constructed threads when they are 194 * available. These pools will typically improve the performance 195 * of programs that execute many short-lived asynchronous tasks. 196 * Calls to {@code execute} will reuse previously constructed 197 * threads if available. If no existing thread is available, a new 198 * thread will be created and added to the pool. Threads that have 199 * not been used for sixty seconds are terminated and removed from 200 * the cache. Thus, a pool that remains idle for long enough will 201 * not consume any resources. Note that pools with similar 202 * properties but different details (for example, timeout parameters) 203 * may be created using {@link ThreadPoolExecutor} constructors. 204 * 205 * @return the newly created thread pool 206 */ 207 // static ExecutorService newCachedThreadPool() { 208 // return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 209 // 60L, TimeUnit.SECONDS, 210 // new SynchronousQueue!(Runnable)()); 211 // } 212 213 // /** 214 // * Creates a thread pool that creates new threads as needed, but 215 // * will reuse previously constructed threads when they are 216 // * available, and uses the provided 217 // * ThreadFactory to create new threads when needed. 218 // * 219 // * @param threadFactory the factory to use when creating new threads 220 // * @return the newly created thread pool 221 // * @throws NullPointerException if threadFactory is null 222 // */ 223 // static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { 224 // return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 225 // 60L, TimeUnit.SECONDS, 226 // new SynchronousQueue!(Runnable)(), 227 // threadFactory); 228 // } 229 230 /** 231 * Creates a single-threaded executor that can schedule commands 232 * to run after a given delay, or to execute periodically. 233 * (Note however that if this single 234 * thread terminates due to a failure during execution prior to 235 * shutdown, a new one will take its place if needed to execute 236 * subsequent tasks.) Tasks are guaranteed to execute 237 * sequentially, and no more than one task will be active at any 238 * given time. Unlike the otherwise equivalent 239 * {@code newScheduledThreadPool(1)} the returned executor is 240 * guaranteed not to be reconfigurable to use additional threads. 241 * 242 * @return the newly created scheduled executor 243 */ 244 static ScheduledExecutorService newSingleThreadScheduledExecutor() { 245 return new DelegatedScheduledExecutorService!ScheduledThreadPoolExecutor 246 (new ScheduledThreadPoolExecutor(1)); 247 } 248 249 /** 250 * Creates a single-threaded executor that can schedule commands 251 * to run after a given delay, or to execute periodically. (Note 252 * however that if this single thread terminates due to a failure 253 * during execution prior to shutdown, a new one will take its 254 * place if needed to execute subsequent tasks.) Tasks are 255 * guaranteed to execute sequentially, and no more than one task 256 * will be active at any given time. Unlike the otherwise 257 * equivalent {@code newScheduledThreadPool(1, threadFactory)} 258 * the returned executor is guaranteed not to be reconfigurable to 259 * use additional threads. 260 * 261 * @param threadFactory the factory to use when creating new threads 262 * @return the newly created scheduled executor 263 * @throws NullPointerException if threadFactory is null 264 */ 265 static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { 266 return new DelegatedScheduledExecutorService!ScheduledThreadPoolExecutor 267 (new ScheduledThreadPoolExecutor(1, threadFactory)); 268 } 269 270 /** 271 * Creates a thread pool that can schedule commands to run after a 272 * given delay, or to execute periodically. 273 * @param corePoolSize the number of threads to keep in the pool, 274 * even if they are idle 275 * @return the newly created scheduled thread pool 276 * @throws IllegalArgumentException if {@code corePoolSize < 0} 277 */ 278 static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { 279 return new ScheduledThreadPoolExecutor(corePoolSize); 280 } 281 282 /** 283 * Creates a thread pool that can schedule commands to run after a 284 * given delay, or to execute periodically. 285 * @param corePoolSize the number of threads to keep in the pool, 286 * even if they are idle 287 * @param threadFactory the factory to use when the executor 288 * creates a new thread 289 * @return the newly created scheduled thread pool 290 * @throws IllegalArgumentException if {@code corePoolSize < 0} 291 * @throws NullPointerException if threadFactory is null 292 */ 293 static ScheduledExecutorService newScheduledThreadPool( 294 int corePoolSize, ThreadFactory threadFactory) { 295 return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); 296 } 297 298 // /** 299 // * Returns an object that delegates all defined {@link 300 // * ExecutorService} methods to the given executor, but not any 301 // * other methods that might otherwise be accessible using 302 // * casts. This provides a way to safely "freeze" configuration and 303 // * disallow tuning of a given concrete implementation. 304 // * @param executor the underlying implementation 305 // * @return an {@code ExecutorService} instance 306 // * @throws NullPointerException if executor null 307 // */ 308 // static ExecutorService unconfigurableExecutorService(ExecutorService executor) { 309 // if (executor is null) 310 // throw new NullPointerException(); 311 // return new DelegatedExecutorService(executor); 312 // } 313 314 // /** 315 // * Returns an object that delegates all defined {@link 316 // * ScheduledExecutorService} methods to the given executor, but 317 // * not any other methods that might otherwise be accessible using 318 // * casts. This provides a way to safely "freeze" configuration and 319 // * disallow tuning of a given concrete implementation. 320 // * @param executor the underlying implementation 321 // * @return a {@code ScheduledExecutorService} instance 322 // * @throws NullPointerException if executor null 323 // */ 324 // static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) { 325 // if (executor is null) 326 // throw new NullPointerException(); 327 // return new DelegatedScheduledExecutorService(executor); 328 // } 329 330 /** 331 * Returns a default thread factory used to create new threads. 332 * This factory creates all new threads used by an Executor in the 333 * same {@link ThreadGroupEx}. If there is a {@link 334 * java.lang.SecurityManager}, it uses the group of {@link 335 * System#getSecurityManager}, else the group of the thread 336 * invoking this {@code defaultThreadFactory} method. Each new 337 * thread is created as a non-daemon thread with priority set to 338 * the smaller of {@code Thread.PRIORITY_DEFAULT} and the maximum 339 * priority permitted in the thread group. New threads have names 340 * accessible via {@link Thread#getName} of 341 * <em>pool-N-thread-M</em>, where <em>N</em> is the sequence 342 * number of this factory, and <em>M</em> is the sequence number 343 * of the thread created by this factory. 344 * @return a thread factory 345 */ 346 static ThreadFactory defaultThreadFactory() { 347 return ThreadFactory.defaultThreadFactory(); 348 } 349 350 // /** 351 // * Returns a thread factory used to create new threads that 352 // * have the same permissions as the current thread. 353 // * This factory creates threads with the same settings as {@link 354 // * Executors#defaultThreadFactory}, additionally setting the 355 // * AccessControlContext and contextClassLoader of new threads to 356 // * be the same as the thread invoking this 357 // * {@code privilegedThreadFactory} method. A new 358 // * {@code privilegedThreadFactory} can be created within an 359 // * {@link AccessController#doPrivileged AccessController.doPrivileged} 360 // * action setting the current thread's access control context to 361 // * create threads with the selected permission settings holding 362 // * within that action. 363 // * 364 // * <p>Note that while tasks running within such threads will have 365 // * the same access control and class loader settings as the 366 // * current thread, they need not have the same {@link 367 // * java.lang.ThreadLocal} or {@link 368 // * java.lang.InheritableThreadLocal} values. If necessary, 369 // * particular values of thread locals can be set or reset before 370 // * any task runs in {@link ThreadPoolExecutor} subclasses using 371 // * {@link ThreadPoolExecutor#beforeExecute(Thread, Runnable)}. 372 // * Also, if it is necessary to initialize worker threads to have 373 // * the same InheritableThreadLocal settings as some other 374 // * designated thread, you can create a custom ThreadFactory in 375 // * which that thread waits for and services requests to create 376 // * others that will inherit its values. 377 // * 378 // * @return a thread factory 379 // * @throws AccessControlException if the current access control 380 // * context does not have permission to both get and set context 381 // * class loader 382 // */ 383 // static ThreadFactory privilegedThreadFactory() { 384 // return new PrivilegedThreadFactory(); 385 // } 386 387 /** 388 * Returns a {@link Callable} object that, when 389 * called, runs the given task and returns the given result. This 390 * can be useful when applying methods requiring a 391 * {@code Callable} to an otherwise resultless action. 392 * @param task the task to run 393 * @param result the result to return 394 * @param (T) the type of the result 395 * @return a callable object 396 * @throws NullPointerException if task null 397 */ 398 static Callable!(void) callable(Runnable task) { 399 if (task is null) 400 throw new NullPointerException(); 401 return new RunnableAdapter!(void)(task); 402 } 403 404 static Callable!(T) callable(T)(Runnable task, T result) if(!is(T == void)) { 405 if (task is null) 406 throw new NullPointerException(); 407 return new RunnableAdapter!(T)(task, result); 408 } 409 410 /** 411 * Returns a {@link Callable} object that, when 412 * called, runs the given task and returns {@code null}. 413 * @param task the task to run 414 * @return a callable object 415 * @throws NullPointerException if task null 416 */ 417 // static Callable!(Object) callable(Runnable task) { 418 // if (task is null) 419 // throw new NullPointerException(); 420 // return new RunnableAdapter!(Object)(task, null); 421 // } 422 423 // /** 424 // * Returns a {@link Callable} object that, when 425 // * called, runs the given privileged action and returns its result. 426 // * @param action the privileged action to run 427 // * @return a callable object 428 // * @throws NullPointerException if action null 429 // */ 430 // static Callable!(Object) callable(PrivilegedAction<?> action) { 431 // if (action is null) 432 // throw new NullPointerException(); 433 // return new Callable!(Object)() { 434 // Object call() { return action.run(); }}; 435 // } 436 437 // /** 438 // * Returns a {@link Callable} object that, when 439 // * called, runs the given privileged exception action and returns 440 // * its result. 441 // * @param action the privileged exception action to run 442 // * @return a callable object 443 // * @throws NullPointerException if action null 444 // */ 445 // static Callable!(Object) callable(PrivilegedExceptionAction<?> action) { 446 // if (action is null) 447 // throw new NullPointerException(); 448 // return new Callable!(Object)() { 449 // Object call() throws Exception { return action.run(); }}; 450 // } 451 452 // /** 453 // * Returns a {@link Callable} object that will, when called, 454 // * execute the given {@code callable} under the current access 455 // * control context. This method should normally be invoked within 456 // * an {@link AccessController#doPrivileged AccessController.doPrivileged} 457 // * action to create callables that will, if possible, execute 458 // * under the selected permission settings holding within that 459 // * action; or if not possible, throw an associated {@link 460 // * AccessControlException}. 461 // * @param callable the underlying task 462 // * @param (T) the type of the callable's result 463 // * @return a callable object 464 // * @throws NullPointerException if callable null 465 // */ 466 // static !(T) Callable!(T) privilegedCallable(Callable!(T) callable) { 467 // if (callable is null) 468 // throw new NullPointerException(); 469 // return new PrivilegedCallable!(T)(callable); 470 // } 471 472 // /** 473 // * Returns a {@link Callable} object that will, when called, 474 // * execute the given {@code callable} under the current access 475 // * control context, with the current context class loader as the 476 // * context class loader. This method should normally be invoked 477 // * within an 478 // * {@link AccessController#doPrivileged AccessController.doPrivileged} 479 // * action to create callables that will, if possible, execute 480 // * under the selected permission settings holding within that 481 // * action; or if not possible, throw an associated {@link 482 // * AccessControlException}. 483 // * 484 // * @param callable the underlying task 485 // * @param (T) the type of the callable's result 486 // * @return a callable object 487 // * @throws NullPointerException if callable null 488 // * @throws AccessControlException if the current access control 489 // * context does not have permission to both set and get context 490 // * class loader 491 // */ 492 // static !(T) Callable!(T) privilegedCallableUsingCurrentClassLoader(Callable!(T) callable) { 493 // if (callable is null) 494 // throw new NullPointerException(); 495 // return new PrivilegedCallableUsingCurrentClassLoader!(T)(callable); 496 // } 497 498 499 // Methods for ExecutorService 500 501 /** 502 * Submits a Runnable task for execution and returns a Future 503 * representing that task. The Future's {@code get} method will 504 * return {@code null} upon <em>successful</em> completion. 505 * 506 * @param task the task to submit 507 * @return a Future representing pending completion of the task 508 * @throws RejectedExecutionException if the task cannot be 509 * scheduled for execution 510 * @throws NullPointerException if the task is null 511 */ 512 static Future!(void) submit(ExecutorService es, Runnable task) { 513 514 AbstractExecutorService aes = cast(AbstractExecutorService)es; 515 if(aes is null) 516 throw new RejectedExecutionException("ExecutorService is null"); 517 else 518 return aes.submit(task); 519 520 // TypeInfo typeInfo = typeid(cast(Object)es); 521 // if(typeInfo == typeid(ThreadPoolExecutor)) { 522 // AbstractExecutorService aes = cast(AbstractExecutorService)es; 523 // return aes.submit(task); 524 // } else { 525 // implementationMissing(false); 526 // } 527 } 528 529 /** 530 * Submits a Runnable task for execution and returns a Future 531 * representing that task. The Future's {@code get} method will 532 * return the given result upon successful completion. 533 * 534 * @param task the task to submit 535 * @param result the result to return 536 * @param (T) the type of the result 537 * @return a Future representing pending completion of the task 538 * @throws RejectedExecutionException if the task cannot be 539 * scheduled for execution 540 * @throws NullPointerException if the task is null 541 */ 542 static Future!(T) submit(T)(ExecutorService es, Runnable task, T result) { 543 AbstractExecutorService aes = cast(AbstractExecutorService)es; 544 if(aes is null) 545 throw new RejectedExecutionException("ExecutorService is null"); 546 else 547 return aes.submit!T(task, result); 548 549 // TypeInfo typeInfo = typeid(cast(Object)es); 550 // if(typeInfo == typeid(ThreadPoolExecutor)) { 551 // AbstractExecutorService aes = cast(AbstractExecutorService)es; 552 // if(aes is null) 553 // throw new RejectedExecutionException("ExecutorService is null"); 554 // else 555 // return aes.submit!T(task, result); 556 // } else { 557 // implementationMissing(false); 558 // } 559 } 560 561 /** 562 * Submits a value-returning task for execution and returns a 563 * Future representing the pending results of the task. The 564 * Future's {@code get} method will return the task's result upon 565 * successful completion. 566 * 567 * <p> 568 * If you would like to immediately block waiting 569 * for a task, you can use constructions of the form 570 * {@code result = exec.submit(aCallable).get();} 571 * 572 * <p>Note: The {@link Executors} class includes a set of methods 573 * that can convert some other common closure-like objects, 574 * for example, {@link java.security.PrivilegedAction} to 575 * {@link Callable} form so they can be submitted. 576 * 577 * @param task the task to submit 578 * @param (T) the type of the task's result 579 * @return a Future representing pending completion of the task 580 * @throws RejectedExecutionException if the task cannot be 581 * scheduled for execution 582 * @throws NullPointerException if the task is null 583 */ 584 static Future!(T) submit(T)(ExecutorService es, Callable!(T) task) { 585 AbstractExecutorService aes = cast(AbstractExecutorService)es; 586 if(aes is null) 587 throw new RejectedExecutionException("ExecutorService is null"); 588 else 589 return aes.submit!(T)(task); 590 591 // TypeInfo typeInfo = typeid(cast(Object)es); 592 // if(typeInfo == typeid(ThreadPoolExecutor)) { 593 // AbstractExecutorService aes = cast(AbstractExecutorService)es; 594 // if(aes is null) 595 // throw new RejectedExecutionException("ExecutorService is null"); 596 // else 597 // return aes.submit!(T)(task); 598 // } else { 599 // implementationMissing(false); 600 // } 601 } 602 603 /** 604 * Executes the given tasks, returning a list of Futures holding 605 * their status and results when all complete. 606 * {@link Future#isDone} is {@code true} for each 607 * element of the returned list. 608 * Note that a <em>completed</em> task could have 609 * terminated either normally or by throwing an exception. 610 * The results of this method are undefined if the given 611 * collection is modified while this operation is in progress. 612 * 613 * @param tasks the collection of tasks 614 * @param (T) the type of the values returned from the tasks 615 * @return a list of Futures representing the tasks, in the same 616 * sequential order as produced by the iterator for the 617 * given task list, each of which has completed 618 * @throws InterruptedException if interrupted while waiting, in 619 * which case unfinished tasks are cancelled 620 * @throws NullPointerException if tasks or any of its elements are {@code null} 621 * @throws RejectedExecutionException if any task cannot be 622 * scheduled for execution 623 */ 624 static List!(Future!(T)) invokeAll(T)(ExecutorService es, Collection!(Callable!(T)) tasks) { 625 626 AbstractExecutorService aes = cast(AbstractExecutorService)es; 627 if(aes is null) 628 throw new RejectedExecutionException("ExecutorService is null"); 629 else { 630 aes.invokeAll!(T)(tasks); 631 } 632 633 } 634 635 /** 636 * Executes the given tasks, returning a list of Futures holding 637 * their status and results 638 * when all complete or the timeout expires, whichever happens first. 639 * {@link Future#isDone} is {@code true} for each 640 * element of the returned list. 641 * Upon return, tasks that have not completed are cancelled. 642 * Note that a <em>completed</em> task could have 643 * terminated either normally or by throwing an exception. 644 * The results of this method are undefined if the given 645 * collection is modified while this operation is in progress. 646 * 647 * @param tasks the collection of tasks 648 * @param timeout the maximum time to wait 649 * @param unit the time unit of the timeout argument 650 * @param (T) the type of the values returned from the tasks 651 * @return a list of Futures representing the tasks, in the same 652 * sequential order as produced by the iterator for the 653 * given task list. If the operation did not time out, 654 * each task will have completed. If it did time out, some 655 * of these tasks will not have completed. 656 * @throws InterruptedException if interrupted while waiting, in 657 * which case unfinished tasks are cancelled 658 * @throws NullPointerException if tasks, any of its elements, or 659 * unit are {@code null} 660 * @throws RejectedExecutionException if any task cannot be scheduled 661 * for execution 662 */ 663 static List!(Future!(T)) invokeAll(T)(ExecutorService es, Collection!(Callable!(T)) tasks, 664 Duration timeout) { 665 AbstractExecutorService aes = cast(AbstractExecutorService)es; 666 if(aes is null) 667 throw new RejectedExecutionException("ExecutorService is null"); 668 else { 669 aes.invokeAll!(T)(tasks, timeout); 670 } 671 } 672 673 /** 674 * Executes the given tasks, returning the result 675 * of one that has completed successfully (i.e., without throwing 676 * an exception), if any do. Upon normal or exceptional return, 677 * tasks that have not completed are cancelled. 678 * The results of this method are undefined if the given 679 * collection is modified while this operation is in progress. 680 * 681 * @param tasks the collection of tasks 682 * @param (T) the type of the values returned from the tasks 683 * @return the result returned by one of the tasks 684 * @throws InterruptedException if interrupted while waiting 685 * @throws NullPointerException if tasks or any element task 686 * subject to execution is {@code null} 687 * @throws IllegalArgumentException if tasks is empty 688 * @throws ExecutionException if no task successfully completes 689 * @throws RejectedExecutionException if tasks cannot be scheduled 690 * for execution 691 */ 692 static T invokeAny(T)(ExecutorService es, Collection!(Callable!(T)) tasks) { 693 AbstractExecutorService aes = cast(AbstractExecutorService)es; 694 if(aes is null) 695 throw new RejectedExecutionException("ExecutorService is null"); 696 else { 697 aes.invokeAny!(T)(tasks); 698 } 699 } 700 701 /** 702 * Executes the given tasks, returning the result 703 * of one that has completed successfully (i.e., without throwing 704 * an exception), if any do before the given timeout elapses. 705 * Upon normal or exceptional return, tasks that have not 706 * completed are cancelled. 707 * The results of this method are undefined if the given 708 * collection is modified while this operation is in progress. 709 * 710 * @param tasks the collection of tasks 711 * @param timeout the maximum time to wait 712 * @param unit the time unit of the timeout argument 713 * @param (T) the type of the values returned from the tasks 714 * @return the result returned by one of the tasks 715 * @throws InterruptedException if interrupted while waiting 716 * @throws NullPointerException if tasks, or unit, or any element 717 * task subject to execution is {@code null} 718 * @throws TimeoutException if the given timeout elapses before 719 * any task successfully completes 720 * @throws ExecutionException if no task successfully completes 721 * @throws RejectedExecutionException if tasks cannot be scheduled 722 * for execution 723 */ 724 static T invokeAny(T)(ExecutorService es, Collection!(Callable!(T)) tasks, 725 Duration timeout) { 726 AbstractExecutorService aes = cast(AbstractExecutorService)es; 727 if(aes is null) 728 throw new RejectedExecutionException("ExecutorService is null"); 729 else { 730 aes.invokeAny!(T)(tasks, timeout); 731 } 732 } 733 734 /** Cannot instantiate. */ 735 private this() {} 736 } 737 738 // Non-classes supporting the methods 739 740 /** 741 * A callable that runs given task and returns given result. 742 */ 743 private final class RunnableAdapter(T) : Callable!(T) if(is(T == void)) { 744 private Runnable task; 745 this(Runnable task) { 746 this.task = task; 747 } 748 749 T call() { 750 try { 751 task.run(); 752 } catch(Throwable th) { 753 warning(th.msg); 754 version(HUNT_DEBUG) { 755 warning(th); 756 } 757 } 758 } 759 760 override string toString() { 761 return super.toString() ~ "[Wrapped task = " ~ (cast(Object)task).toString() ~ "]"; 762 } 763 } 764 765 private final class RunnableAdapter(T) : Callable!(T) if(!is(T == void)) { 766 private Runnable task; 767 private T result; 768 769 this(Runnable task, T result) { 770 this.task = task; 771 this.result = result; 772 } 773 774 T call() { 775 try { 776 task.run(); 777 } catch(Throwable th) { 778 warning(th.msg); 779 version(HUNT_DEBUG) { 780 warning(th); 781 } 782 } 783 return result; 784 } 785 786 override string toString() { 787 return super.toString() ~ "[Wrapped task = " ~ (cast(Object)task).toString() ~ "]"; 788 } 789 } 790 791 // /** 792 // * A callable that runs under established access control settings. 793 // */ 794 // private final class PrivilegedCallable!(T) : Callable!(T) { 795 // Callable!(T) task; 796 // AccessControlContext acc; 797 798 // PrivilegedCallable(Callable!(T) task) { 799 // this.task = task; 800 // this.acc = AccessController.getContext(); 801 // } 802 803 // T call() throws Exception { 804 // try { 805 // return AccessController.doPrivileged( 806 // new PrivilegedExceptionAction!(T)() { 807 // T run() throws Exception { 808 // return task.call(); 809 // } 810 // }, acc); 811 // } catch (PrivilegedActionException e) { 812 // throw e.getException(); 813 // } 814 // } 815 816 // string toString() { 817 // return super.toString() ~ "[Wrapped task = " ~ task ~ "]"; 818 // } 819 // } 820 821 // /** 822 // * A callable that runs under established access control settings and 823 // * current ClassLoader. 824 // */ 825 // private final class PrivilegedCallableUsingCurrentClassLoader(T) 826 // : Callable!(T) { 827 // Callable!(T) task; 828 // AccessControlContext acc; 829 // ClassLoader ccl; 830 831 // this(Callable!(T) task) { 832 // SecurityManager sm = System.getSecurityManager(); 833 // if (sm !is null) { 834 // // Calls to getContextClassLoader from this class 835 // // never trigger a security check, but we check 836 // // whether our callers have this permission anyways. 837 // sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION); 838 839 // // Whether setContextClassLoader turns out to be necessary 840 // // or not, we fail fast if permission is not available. 841 // sm.checkPermission(new RuntimePermission("setContextClassLoader")); 842 // } 843 // this.task = task; 844 // this.acc = AccessController.getContext(); 845 // this.ccl = Thread.getThis().getContextClassLoader(); 846 // } 847 848 // T call() throws Exception { 849 // try { 850 // return AccessController.doPrivileged( 851 // new PrivilegedExceptionAction!(T)() { 852 // T run() throws Exception { 853 // Thread t = Thread.getThis(); 854 // ClassLoader cl = t.getContextClassLoader(); 855 // if (ccl == cl) { 856 // return task.call(); 857 // } else { 858 // t.setContextClassLoader(ccl); 859 // try { 860 // return task.call(); 861 // } finally { 862 // t.setContextClassLoader(cl); 863 // } 864 // } 865 // } 866 // }, acc); 867 // } catch (PrivilegedActionException e) { 868 // throw e.getException(); 869 // } 870 // } 871 872 // string toString() { 873 // return super.toString() ~ "[Wrapped task = " ~ task ~ "]"; 874 // } 875 // } 876 877 void reachabilityFence(ExecutorService) { 878 // do nothing; 879 // TODO: Tasks pending completion -@zxp at 5/10/2019, 10:50:31 AM 880 // remove this 881 } 882 883 /** 884 * A wrapper class that exposes only the ExecutorService methods 885 * of an ExecutorService implementation. 886 */ 887 private class DelegatedExecutorService(U) : ExecutorService 888 if(is(U : ExecutorService)) { 889 890 private U e; 891 892 this(U executor) { e = executor; } 893 894 void execute(Runnable command) { 895 try { 896 e.execute(command); 897 } finally { reachabilityFence(this); } 898 } 899 900 void shutdown() { e.shutdown(); } 901 902 List!(Runnable) shutdownNow() { 903 try { 904 return e.shutdownNow(); 905 } finally { reachabilityFence(this); } 906 } 907 908 bool isShutdown() { 909 try { 910 return e.isShutdown(); 911 } finally { reachabilityFence(this); } 912 } 913 914 bool isTerminated() { 915 try { 916 return e.isTerminated(); 917 } finally { reachabilityFence(this); } 918 } 919 920 bool awaitTermination(Duration timeout) { 921 try { 922 return e.awaitTermination(timeout); 923 } finally { reachabilityFence(this); } 924 } 925 926 Future!void submit(Runnable task) { 927 try { 928 return e.submit(task); 929 } finally { reachabilityFence(this); } 930 } 931 932 Future!(T) submit(T)(Callable!(T) task) { 933 try { 934 return e.submit(task); 935 } finally { reachabilityFence(this); } 936 } 937 938 Future!(T) submit(T)(Runnable task, T result) { 939 try { 940 return e.submit(task, result); 941 } finally { reachabilityFence(this); } 942 } 943 944 List!(Future!(T)) invokeAll(T)(Collection!(Callable!(T)) tasks) { 945 try { 946 return e.invokeAll(tasks); 947 } finally { reachabilityFence(this); } 948 } 949 950 List!(Future!(T)) invokeAll(T)(Collection!(Callable!(T)) tasks, 951 Duration timeout) { 952 try { 953 return e.invokeAll(tasks, timeout, unit); 954 } finally { reachabilityFence(this); } 955 } 956 957 T invokeAny(T)(Collection!(Callable!(T)) tasks) { 958 try { 959 return e.invokeAny(tasks); 960 } finally { reachabilityFence(this); } 961 } 962 963 T invokeAny(T)(Collection!(Callable!(T)) tasks, 964 Duration timeout) { 965 try { 966 return e.invokeAny(tasks, timeout, unit); 967 } finally { reachabilityFence(this); } 968 } 969 } 970 971 private class FinalizableDelegatedExecutorService(T) : DelegatedExecutorService!T { 972 this(T executor) { 973 super(executor); 974 } 975 976 protected void finalize() { 977 super.shutdown(); 978 } 979 } 980 981 /** 982 * A wrapper class that exposes only the ScheduledExecutorService 983 * methods of a ScheduledExecutorService implementation. 984 */ 985 private class DelegatedScheduledExecutorService(T) : DelegatedExecutorService!T, 986 ScheduledExecutorService if(is(T : ScheduledExecutorService)){ 987 988 private T e; 989 990 this(T executor) { 991 super(executor); 992 e = executor; 993 } 994 995 ScheduledFuture!void schedule(Runnable command, Duration delay) { 996 return e.schedule(command, delay); 997 } 998 999 ScheduledFuture!(V) schedule(V)(Callable!(V) callable, Duration delay) { 1000 return e.schedule!V(callable, delay); 1001 } 1002 1003 ScheduledFuture!void scheduleAtFixedRate(Runnable command, Duration initialDelay, Duration period) { 1004 return e.scheduleAtFixedRate(command, initialDelay, period); 1005 } 1006 1007 ScheduledFuture!void scheduleWithFixedDelay(Runnable command, Duration initialDelay, Duration delay) { 1008 return e.scheduleWithFixedDelay(command, initialDelay, delay); 1009 } 1010 }