1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.concurrency.BlockingQueue; 13 14 import hunt.collection.Collection; 15 import hunt.collection.Queue; 16 import core.time; 17 18 /** 19 * A {@link Queue} that additionally supports operations that wait for 20 * the queue to become non-empty when retrieving an element, and wait 21 * for space to become available in the queue when storing an element. 22 * 23 * <p>{@code BlockingQueue} methods come in four forms, with different ways 24 * of handling operations that cannot be satisfied immediately, but may be 25 * satisfied at some point in the future: 26 * one throws an exception, the second returns a special value (either 27 * {@code null} or {@code false}, depending on the operation), the third 28 * blocks the current thread indefinitely until the operation can succeed, 29 * and the fourth blocks for only a given maximum time limit before giving 30 * up. These methods are summarized in the following table: 31 * 32 * <table class="plain"> 33 * <caption>Summary of BlockingQueue methods</caption> 34 * <tr> 35 * <td></td> 36 * <th scope="col" style="font-weight:normal; font-style:italic">Throws exception</th> 37 * <th scope="col" style="font-weight:normal; font-style:italic">Special value</th> 38 * <th scope="col" style="font-weight:normal; font-style:italic">Blocks</th> 39 * <th scope="col" style="font-weight:normal; font-style:italic">Times out</th> 40 * </tr> 41 * <tr> 42 * <th scope="row" style="text-align:left">Insert</th> 43 * <td>{@link #add(Object) add(e)}</td> 44 * <td>{@link #offer(Object) offer(e)}</td> 45 * <td>{@link #put(Object) put(e)}</td> 46 * <td>{@link #offer(Object, long, TimeUnit) offer(e, time, unit)}</td> 47 * </tr> 48 * <tr> 49 * <th scope="row" style="text-align:left">Remove</th> 50 * <td>{@link #remove() remove()}</td> 51 * <td>{@link #poll() poll()}</td> 52 * <td>{@link #take() take()}</td> 53 * <td>{@link #poll(long, TimeUnit) poll(time, unit)}</td> 54 * </tr> 55 * <tr> 56 * <th scope="row" style="text-align:left">Examine</th> 57 * <td>{@link #element() element()}</td> 58 * <td>{@link #peek() peek()}</td> 59 * <td style="font-style: italic">not applicable</td> 60 * <td style="font-style: italic">not applicable</td> 61 * </tr> 62 * </table> 63 * 64 * <p>A {@code BlockingQueue} does not accept {@code null} elements. 65 * Implementations throw {@code NullPointerException} on attempts 66 * to {@code add}, {@code put} or {@code offer} a {@code null}. A 67 * {@code null} is used as a sentinel value to indicate failure of 68 * {@code poll} operations. 69 * 70 * <p>A {@code BlockingQueue} may be capacity bounded. At any given 71 * time it may have a {@code remainingCapacity} beyond which no 72 * additional elements can be {@code put} without blocking. 73 * A {@code BlockingQueue} without any intrinsic capacity constraints always 74 * reports a remaining capacity of {@code Integer.MAX_VALUE}. 75 * 76 * <p>{@code BlockingQueue} implementations are designed to be used 77 * primarily for producer-consumer queues, but additionally support 78 * the {@link Collection} interface. So, for example, it is 79 * possible to remove an arbitrary element from a queue using 80 * {@code remove(x)}. However, such operations are in general 81 * <em>not</em> performed very efficiently, and are intended for only 82 * occasional use, such as when a queued message is cancelled. 83 * 84 * <p>{@code BlockingQueue} implementations are thread-safe. All 85 * queuing methods achieve their effects atomically using internal 86 * locks or other forms of concurrency control. However, the 87 * <em>bulk</em> Collection operations {@code addAll}, 88 * {@code containsAll}, {@code retainAll} and {@code removeAll} are 89 * <em>not</em> necessarily performed atomically unless specified 90 * otherwise in an implementation. So it is possible, for example, for 91 * {@code addAll(c)} to fail (throwing an exception) after adding 92 * only some of the elements in {@code c}. 93 * 94 * <p>A {@code BlockingQueue} does <em>not</em> intrinsically support 95 * any kind of "close" or "shutdown" operation to 96 * indicate that no more items will be added. The needs and usage of 97 * such features tend to be implementation-dependent. For example, a 98 * common tactic is for producers to insert special 99 * <em>end-of-stream</em> or <em>poison</em> objects, that are 100 * interpreted accordingly when taken by consumers. 101 * 102 * <p> 103 * Usage example, based on a typical producer-consumer scenario. 104 * Note that a {@code BlockingQueue} can safely be used with multiple 105 * producers and multiple consumers. 106 * <pre> {@code 107 * class Producer implements Runnable { 108 * private final BlockingQueue queue; 109 * Producer(BlockingQueue q) { queue = q; } 110 * void run() { 111 * try { 112 * while (true) { queue.put(produce()); } 113 * } catch (InterruptedException ex) { ... handle ...} 114 * } 115 * Object produce() { ... } 116 * } 117 * 118 * class Consumer implements Runnable { 119 * private final BlockingQueue queue; 120 * Consumer(BlockingQueue q) { queue = q; } 121 * void run() { 122 * try { 123 * while (true) { consume(queue.take()); } 124 * } catch (InterruptedException ex) { ... handle ...} 125 * } 126 * void consume(Object x) { ... } 127 * } 128 * 129 * class Setup { 130 * void main() { 131 * BlockingQueue q = new SomeQueueImplementation(); 132 * Producer p = new Producer(q); 133 * Consumer c1 = new Consumer(q); 134 * Consumer c2 = new Consumer(q); 135 * new Thread(p).start(); 136 * new Thread(c1).start(); 137 * new Thread(c2).start(); 138 * } 139 * }}</pre> 140 * 141 * <p>Memory consistency effects: As with other concurrent 142 * collections, actions in a thread prior to placing an object into a 143 * {@code BlockingQueue} 144 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> 145 * actions subsequent to the access or removal of that element from 146 * the {@code BlockingQueue} in another thread. 147 * 148 * <p>This interface is a member of the 149 * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework"> 150 * Java Collections Framework</a>. 151 * 152 * @author Doug Lea 153 * @param (E) the type of elements held in this queue 154 */ 155 interface BlockingQueue(E) : Queue!(E) { 156 /** 157 * Inserts the specified element into this queue if it is possible to do 158 * so immediately without violating capacity restrictions, returning 159 * {@code true} upon success and throwing an 160 * {@code IllegalStateException} if no space is currently available. 161 * When using a capacity-restricted queue, it is generally preferable to 162 * use {@link #offer(Object) offer}. 163 * 164 * @param e the element to add 165 * @return {@code true} (as specified by {@link Collection#add}) 166 * @throws IllegalStateException if the element cannot be added at this 167 * time due to capacity restrictions 168 * @throws ClassCastException if the class of the specified element 169 * prevents it from being added to this queue 170 * @throws NullPointerException if the specified element is null 171 * @throws IllegalArgumentException if some property of the specified 172 * element prevents it from being added to this queue 173 */ 174 bool add(E e); 175 176 /** 177 * Inserts the specified element into this queue if it is possible to do 178 * so immediately without violating capacity restrictions, returning 179 * {@code true} upon success and {@code false} if no space is currently 180 * available. When using a capacity-restricted queue, this method is 181 * generally preferable to {@link #add}, which can fail to insert an 182 * element only by throwing an exception. 183 * 184 * @param e the element to add 185 * @return {@code true} if the element was added to this queue, else 186 * {@code false} 187 * @throws ClassCastException if the class of the specified element 188 * prevents it from being added to this queue 189 * @throws NullPointerException if the specified element is null 190 * @throws IllegalArgumentException if some property of the specified 191 * element prevents it from being added to this queue 192 */ 193 bool offer(E e); 194 195 /** 196 * Inserts the specified element into this queue, waiting if necessary 197 * for space to become available. 198 * 199 * @param e the element to add 200 * @throws InterruptedException if interrupted while waiting 201 * @throws ClassCastException if the class of the specified element 202 * prevents it from being added to this queue 203 * @throws NullPointerException if the specified element is null 204 * @throws IllegalArgumentException if some property of the specified 205 * element prevents it from being added to this queue 206 */ 207 void put(E e); 208 209 /** 210 * Inserts the specified element into this queue, waiting up to the 211 * specified wait time if necessary for space to become available. 212 * 213 * @param e the element to add 214 * @param timeout how long to wait before giving up, in units of 215 * {@code unit} 216 * @param unit a {@code TimeUnit} determining how to interpret the 217 * {@code timeout} parameter 218 * @return {@code true} if successful, or {@code false} if 219 * the specified waiting time elapses before space is available 220 * @throws InterruptedException if interrupted while waiting 221 * @throws ClassCastException if the class of the specified element 222 * prevents it from being added to this queue 223 * @throws NullPointerException if the specified element is null 224 * @throws IllegalArgumentException if some property of the specified 225 * element prevents it from being added to this queue 226 */ 227 bool offer(E e, Duration timeout); 228 229 /** 230 * Retrieves and removes the head of this queue, waiting if necessary 231 * until an element becomes available. 232 * 233 * @return the head of this queue 234 * @throws InterruptedException if interrupted while waiting 235 */ 236 E take(); 237 238 /** 239 * Retrieves and removes the head of this queue, waiting up to the 240 * specified wait time if necessary for an element to become available. 241 * 242 * @param timeout how long to wait before giving up, in units of 243 * {@code unit} 244 * @param unit a {@code TimeUnit} determining how to interpret the 245 * {@code timeout} parameter 246 * @return the head of this queue, or {@code null} if the 247 * specified waiting time elapses before an element is available 248 * @throws InterruptedException if interrupted while waiting 249 */ 250 E poll(Duration timeout); 251 252 alias poll = Queue!E.poll; 253 254 /** 255 * Returns the number of additional elements that this queue can ideally 256 * (in the absence of memory or resource constraints) accept without 257 * blocking, or {@code Integer.MAX_VALUE} if there is no intrinsic 258 * limit. 259 * 260 * <p>Note that you <em>cannot</em> always tell if an attempt to insert 261 * an element will succeed by inspecting {@code remainingCapacity} 262 * because it may be the case that another thread is about to 263 * insert or remove an element. 264 * 265 * @return the remaining capacity 266 */ 267 int remainingCapacity(); 268 269 /** 270 * Removes a single instance of the specified element from this queue, 271 * if it is present. More formally, removes an element {@code e} such 272 * that {@code o.equals(e)}, if this queue contains one or more such 273 * elements. 274 * Returns {@code true} if this queue contained the specified element 275 * (or equivalently, if this queue changed as a result of the call). 276 * 277 * @param o element to be removed from this queue, if present 278 * @return {@code true} if this queue changed as a result of the call 279 * @throws ClassCastException if the class of the specified element 280 * is incompatible with this queue 281 * (<a href="{@docRoot}/java.base/java/util/Collection.html#optional-restrictions">optional</a>) 282 * @throws NullPointerException if the specified element is null 283 * (<a href="{@docRoot}/java.base/java/util/Collection.html#optional-restrictions">optional</a>) 284 */ 285 bool remove(E o); 286 287 /** 288 * Returns {@code true} if this queue contains the specified element. 289 * More formally, returns {@code true} if and only if this queue contains 290 * at least one element {@code e} such that {@code o.equals(e)}. 291 * 292 * @param o object to be checked for containment in this queue 293 * @return {@code true} if this queue contains the specified element 294 * @throws ClassCastException if the class of the specified element 295 * is incompatible with this queue 296 * (<a href="{@docRoot}/java.base/java/util/Collection.html#optional-restrictions">optional</a>) 297 * @throws NullPointerException if the specified element is null 298 * (<a href="{@docRoot}/java.base/java/util/Collection.html#optional-restrictions">optional</a>) 299 */ 300 bool contains(E o); 301 302 /** 303 * Removes all available elements from this queue and adds them 304 * to the given collection. This operation may be more 305 * efficient than repeatedly polling this queue. A failure 306 * encountered while attempting to add elements to 307 * collection {@code c} may result in elements being in neither, 308 * either or both collections when the associated exception is 309 * thrown. Attempts to drain a queue to itself result in 310 * {@code IllegalArgumentException}. Further, the behavior of 311 * this operation is undefined if the specified collection is 312 * modified while the operation is in progress. 313 * 314 * @param c the collection to transfer elements into 315 * @return the number of elements transferred 316 * @throws UnsupportedOperationException if addition of elements 317 * is not supported by the specified collection 318 * @throws ClassCastException if the class of an element of this queue 319 * prevents it from being added to the specified collection 320 * @throws NullPointerException if the specified collection is null 321 * @throws IllegalArgumentException if the specified collection is this 322 * queue, or some property of an element of this queue prevents 323 * it from being added to the specified collection 324 */ 325 int drainTo(Collection!(E) c); 326 327 /** 328 * Removes at most the given number of available elements from 329 * this queue and adds them to the given collection. A failure 330 * encountered while attempting to add elements to 331 * collection {@code c} may result in elements being in neither, 332 * either or both collections when the associated exception is 333 * thrown. Attempts to drain a queue to itself result in 334 * {@code IllegalArgumentException}. Further, the behavior of 335 * this operation is undefined if the specified collection is 336 * modified while the operation is in progress. 337 * 338 * @param c the collection to transfer elements into 339 * @param maxElements the maximum number of elements to transfer 340 * @return the number of elements transferred 341 * @throws UnsupportedOperationException if addition of elements 342 * is not supported by the specified collection 343 * @throws ClassCastException if the class of an element of this queue 344 * prevents it from being added to the specified collection 345 * @throws NullPointerException if the specified collection is null 346 * @throws IllegalArgumentException if the specified collection is this 347 * queue, or some property of an element of this queue prevents 348 * it from being added to the specified collection 349 */ 350 int drainTo(Collection!(E) c, int maxElements); 351 } 352 353 354 // TODO: Tasks pending completion -@zxp at 12/31/2018, 10:15:14 AM 355 // 356 // abstract class AbstractBlockingQueue(E) : AbstractQueue!(E), BlockingQueue!(E) { 357 358 // /** 359 // * Constructor for use by subclasses. 360 // */ 361 // protected this() { 362 // } 363 364 // /** 365 // * Inserts the specified element into this queue if it is possible to do so 366 // * immediately without violating capacity restrictions, returning 367 // * {@code true} upon success and throwing an {@code IllegalStateException} 368 // * if no space is currently available. 369 // * 370 // * <p>This implementation returns {@code true} if {@code offer} succeeds, 371 // * else throws an {@code IllegalStateException}. 372 // * 373 // * @param e the element to add 374 // * @return {@code true} (as specified by {@link Collection#add}) 375 // * @throws IllegalStateException if the element cannot be added at this 376 // * time due to capacity restrictions 377 // * @throws ClassCastException if the class of the specified element 378 // * prevents it from being added to this queue 379 // * @throws NullPointerException if the specified element is null and 380 // * this queue does not permit null elements 381 // * @throws IllegalArgumentException if some property of this element 382 // * prevents it from being added to this queue 383 // */ 384 // override bool add(E e) { 385 // if (offer(e)) 386 // return true; 387 // else 388 // throw new IllegalStateException("Queue full"); 389 // } 390 391 // /** 392 // * Retrieves and removes the head of this queue. This method differs 393 // * from {@link #poll poll} only in that it throws an exception if this 394 // * queue is empty. 395 // * 396 // * <p>This implementation returns the result of {@code poll} 397 // * unless the queue is empty. 398 // * 399 // * @return the head of this queue 400 // * @throws NoSuchElementException if this queue is empty 401 // */ 402 // E remove() { 403 // E x = poll(); 404 // static if(is(E == class) || is(E == string)) { 405 // if (x is null) throw new NoSuchElementException(); 406 // } 407 // return x; 408 // } 409 410 // /** 411 // * Retrieves, but does not remove, the head of this queue. This method 412 // * differs from {@link #peek peek} only in that it throws an exception if 413 // * this queue is empty. 414 // * 415 // * <p>This implementation returns the result of {@code peek} 416 // * unless the queue is empty. 417 // * 418 // * @return the head of this queue 419 // * @throws NoSuchElementException if this queue is empty 420 // */ 421 // E element() { 422 // E x = peek(); 423 424 // static if(is(E == class) || is(E == string)) { 425 // if (x is null) throw new NoSuchElementException(); 426 // } 427 // return x; 428 // } 429 430 // /** 431 // * Removes all of the elements from this queue. 432 // * The queue will be empty after this call returns. 433 // * 434 // * <p>This implementation repeatedly invokes {@link #poll poll} until it 435 // * returns {@code null}. 436 // */ 437 // override void clear() { 438 // static if(is(E == class) || is(E == string)) { 439 // while (poll() !is null) {} 440 // } else { 441 // while(size()>0) { 442 // poll(); 443 // } 444 // } 445 // } 446 447 // /** 448 // * Adds all of the elements in the specified collection to this 449 // * queue. Attempts to addAll of a queue to itself result in 450 // * {@code IllegalArgumentException}. Further, the behavior of 451 // * this operation is undefined if the specified collection is 452 // * modified while the operation is in progress. 453 // * 454 // * <p>This implementation iterates over the specified collection, 455 // * and adds each element returned by the iterator to this 456 // * queue, in turn. A runtime exception encountered while 457 // * trying to add an element (including, in particular, a 458 // * {@code null} element) may result in only some of the elements 459 // * having been successfully added when the associated exception is 460 // * thrown. 461 // * 462 // * @param c collection containing elements to be added to this queue 463 // * @return {@code true} if this queue changed as a result of the call 464 // * @throws ClassCastException if the class of an element of the specified 465 // * collection prevents it from being added to this queue 466 // * @throws NullPointerException if the specified collection contains a 467 // * null element and this queue does not permit null elements, 468 // * or if the specified collection is null 469 // * @throws IllegalArgumentException if some property of an element of the 470 // * specified collection prevents it from being added to this 471 // * queue, or if the specified collection is this queue 472 // * @throws IllegalStateException if not all the elements can be added at 473 // * this time due to insertion restrictions 474 // * @see #add(Object) 475 // */ 476 // override bool addAll(Collection!E c) { 477 // if (c is null) 478 // throw new NullPointerException(); 479 // if (c is this) 480 // throw new IllegalArgumentException(); 481 // bool modified = false; 482 // foreach (E e ; c) { 483 // if (add(e)) modified = true; 484 // } 485 // return modified; 486 // } 487 488 // override bool opEquals(IObject o) { 489 // return opEquals(cast(Object) o); 490 // } 491 492 // override bool opEquals(Object o) { 493 // return super.opEquals(o); 494 // } 495 496 // override size_t toHash() @trusted nothrow { 497 // return super.toHash(); 498 // } 499 500 // override string toString() { 501 // return super.toString(); 502 // } 503 // }