1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.concurrency.BlockingQueue;
13 
14 import hunt.collection.Collection;
15 import hunt.collection.Queue;
16 import core.time;
17 
18 /**
19  * A {@link Queue} that additionally supports operations that wait for
20  * the queue to become non-empty when retrieving an element, and wait
21  * for space to become available in the queue when storing an element.
22  *
23  * <p>{@code BlockingQueue} methods come in four forms, with different ways
24  * of handling operations that cannot be satisfied immediately, but may be
25  * satisfied at some point in the future:
26  * one throws an exception, the second returns a special value (either
27  * {@code null} or {@code false}, depending on the operation), the third
28  * blocks the current thread indefinitely until the operation can succeed,
29  * and the fourth blocks for only a given maximum time limit before giving
30  * up.  These methods are summarized in the following table:
31  *
32  * <table class="plain">
33  * <caption>Summary of BlockingQueue methods</caption>
34  *  <tr>
35  *    <td></td>
36  *    <th scope="col" style="font-weight:normal; font-style:italic">Throws exception</th>
37  *    <th scope="col" style="font-weight:normal; font-style:italic">Special value</th>
38  *    <th scope="col" style="font-weight:normal; font-style:italic">Blocks</th>
39  *    <th scope="col" style="font-weight:normal; font-style:italic">Times out</th>
40  *  </tr>
41  *  <tr>
42  *    <th scope="row" style="text-align:left">Insert</th>
43  *    <td>{@link #add(Object) add(e)}</td>
44  *    <td>{@link #offer(Object) offer(e)}</td>
45  *    <td>{@link #put(Object) put(e)}</td>
46  *    <td>{@link #offer(Object, long, TimeUnit) offer(e, time, unit)}</td>
47  *  </tr>
48  *  <tr>
49  *    <th scope="row" style="text-align:left">Remove</th>
50  *    <td>{@link #remove() remove()}</td>
51  *    <td>{@link #poll() poll()}</td>
52  *    <td>{@link #take() take()}</td>
53  *    <td>{@link #poll(long, TimeUnit) poll(time, unit)}</td>
54  *  </tr>
55  *  <tr>
56  *    <th scope="row" style="text-align:left">Examine</th>
57  *    <td>{@link #element() element()}</td>
58  *    <td>{@link #peek() peek()}</td>
59  *    <td style="font-style: italic">not applicable</td>
60  *    <td style="font-style: italic">not applicable</td>
61  *  </tr>
62  * </table>
63  *
64  * <p>A {@code BlockingQueue} does not accept {@code null} elements.
65  * Implementations throw {@code NullPointerException} on attempts
66  * to {@code add}, {@code put} or {@code offer} a {@code null}.  A
67  * {@code null} is used as a sentinel value to indicate failure of
68  * {@code poll} operations.
69  *
70  * <p>A {@code BlockingQueue} may be capacity bounded. At any given
71  * time it may have a {@code remainingCapacity} beyond which no
72  * additional elements can be {@code put} without blocking.
73  * A {@code BlockingQueue} without any intrinsic capacity constraints always
74  * reports a remaining capacity of {@code Integer.MAX_VALUE}.
75  *
76  * <p>{@code BlockingQueue} implementations are designed to be used
77  * primarily for producer-consumer queues, but additionally support
78  * the {@link Collection} interface.  So, for example, it is
79  * possible to remove an arbitrary element from a queue using
80  * {@code remove(x)}. However, such operations are in general
81  * <em>not</em> performed very efficiently, and are intended for only
82  * occasional use, such as when a queued message is cancelled.
83  *
84  * <p>{@code BlockingQueue} implementations are thread-safe.  All
85  * queuing methods achieve their effects atomically using internal
86  * locks or other forms of concurrency control. However, the
87  * <em>bulk</em> Collection operations {@code addAll},
88  * {@code containsAll}, {@code retainAll} and {@code removeAll} are
89  * <em>not</em> necessarily performed atomically unless specified
90  * otherwise in an implementation. So it is possible, for example, for
91  * {@code addAll(c)} to fail (throwing an exception) after adding
92  * only some of the elements in {@code c}.
93  *
94  * <p>A {@code BlockingQueue} does <em>not</em> intrinsically support
95  * any kind of &quot;close&quot; or &quot;shutdown&quot; operation to
96  * indicate that no more items will be added.  The needs and usage of
97  * such features tend to be implementation-dependent. For example, a
98  * common tactic is for producers to insert special
99  * <em>end-of-stream</em> or <em>poison</em> objects, that are
100  * interpreted accordingly when taken by consumers.
101  *
102  * <p>
103  * Usage example, based on a typical producer-consumer scenario.
104  * Note that a {@code BlockingQueue} can safely be used with multiple
105  * producers and multiple consumers.
106  * <pre> {@code
107  * class Producer implements Runnable {
108  *   private final BlockingQueue queue;
109  *   Producer(BlockingQueue q) { queue = q; }
110  *   void run() {
111  *     try {
112  *       while (true) { queue.put(produce()); }
113  *     } catch (InterruptedException ex) { ... handle ...}
114  *   }
115  *   Object produce() { ... }
116  * }
117  *
118  * class Consumer implements Runnable {
119  *   private final BlockingQueue queue;
120  *   Consumer(BlockingQueue q) { queue = q; }
121  *   void run() {
122  *     try {
123  *       while (true) { consume(queue.take()); }
124  *     } catch (InterruptedException ex) { ... handle ...}
125  *   }
126  *   void consume(Object x) { ... }
127  * }
128  *
129  * class Setup {
130  *   void main() {
131  *     BlockingQueue q = new SomeQueueImplementation();
132  *     Producer p = new Producer(q);
133  *     Consumer c1 = new Consumer(q);
134  *     Consumer c2 = new Consumer(q);
135  *     new Thread(p).start();
136  *     new Thread(c1).start();
137  *     new Thread(c2).start();
138  *   }
139  * }}</pre>
140  *
141  * <p>Memory consistency effects: As with other concurrent
142  * collections, actions in a thread prior to placing an object into a
143  * {@code BlockingQueue}
144  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
145  * actions subsequent to the access or removal of that element from
146  * the {@code BlockingQueue} in another thread.
147  *
148  * <p>This interface is a member of the
149  * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework">
150  * Java Collections Framework</a>.
151  *
152  * @author Doug Lea
153  * @param (E) the type of elements held in this queue
154  */
155 interface BlockingQueue(E) : Queue!(E) {
156     /**
157      * Inserts the specified element into this queue if it is possible to do
158      * so immediately without violating capacity restrictions, returning
159      * {@code true} upon success and throwing an
160      * {@code IllegalStateException} if no space is currently available.
161      * When using a capacity-restricted queue, it is generally preferable to
162      * use {@link #offer(Object) offer}.
163      *
164      * @param e the element to add
165      * @return {@code true} (as specified by {@link Collection#add})
166      * @throws IllegalStateException if the element cannot be added at this
167      *         time due to capacity restrictions
168      * @throws ClassCastException if the class of the specified element
169      *         prevents it from being added to this queue
170      * @throws NullPointerException if the specified element is null
171      * @throws IllegalArgumentException if some property of the specified
172      *         element prevents it from being added to this queue
173      */
174     bool add(E e);
175 
176     /**
177      * Inserts the specified element into this queue if it is possible to do
178      * so immediately without violating capacity restrictions, returning
179      * {@code true} upon success and {@code false} if no space is currently
180      * available.  When using a capacity-restricted queue, this method is
181      * generally preferable to {@link #add}, which can fail to insert an
182      * element only by throwing an exception.
183      *
184      * @param e the element to add
185      * @return {@code true} if the element was added to this queue, else
186      *         {@code false}
187      * @throws ClassCastException if the class of the specified element
188      *         prevents it from being added to this queue
189      * @throws NullPointerException if the specified element is null
190      * @throws IllegalArgumentException if some property of the specified
191      *         element prevents it from being added to this queue
192      */
193     bool offer(E e);
194 
195     /**
196      * Inserts the specified element into this queue, waiting if necessary
197      * for space to become available.
198      *
199      * @param e the element to add
200      * @throws InterruptedException if interrupted while waiting
201      * @throws ClassCastException if the class of the specified element
202      *         prevents it from being added to this queue
203      * @throws NullPointerException if the specified element is null
204      * @throws IllegalArgumentException if some property of the specified
205      *         element prevents it from being added to this queue
206      */
207     void put(E e);
208 
209     /**
210      * Inserts the specified element into this queue, waiting up to the
211      * specified wait time if necessary for space to become available.
212      *
213      * @param e the element to add
214      * @param timeout how long to wait before giving up, in units of
215      *        {@code unit}
216      * @param unit a {@code TimeUnit} determining how to interpret the
217      *        {@code timeout} parameter
218      * @return {@code true} if successful, or {@code false} if
219      *         the specified waiting time elapses before space is available
220      * @throws InterruptedException if interrupted while waiting
221      * @throws ClassCastException if the class of the specified element
222      *         prevents it from being added to this queue
223      * @throws NullPointerException if the specified element is null
224      * @throws IllegalArgumentException if some property of the specified
225      *         element prevents it from being added to this queue
226      */
227     bool offer(E e, Duration timeout);
228 
229     /**
230      * Retrieves and removes the head of this queue, waiting if necessary
231      * until an element becomes available.
232      *
233      * @return the head of this queue
234      * @throws InterruptedException if interrupted while waiting
235      */
236     E take();
237 
238     /**
239      * Retrieves and removes the head of this queue, waiting up to the
240      * specified wait time if necessary for an element to become available.
241      *
242      * @param timeout how long to wait before giving up, in units of
243      *        {@code unit}
244      * @param unit a {@code TimeUnit} determining how to interpret the
245      *        {@code timeout} parameter
246      * @return the head of this queue, or {@code null} if the
247      *         specified waiting time elapses before an element is available
248      * @throws InterruptedException if interrupted while waiting
249      */
250     E poll(Duration timeout);
251 
252     alias poll = Queue!E.poll;
253 
254     /**
255      * Returns the number of additional elements that this queue can ideally
256      * (in the absence of memory or resource constraints) accept without
257      * blocking, or {@code Integer.MAX_VALUE} if there is no intrinsic
258      * limit.
259      *
260      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
261      * an element will succeed by inspecting {@code remainingCapacity}
262      * because it may be the case that another thread is about to
263      * insert or remove an element.
264      *
265      * @return the remaining capacity
266      */
267     int remainingCapacity();
268 
269     /**
270      * Removes a single instance of the specified element from this queue,
271      * if it is present.  More formally, removes an element {@code e} such
272      * that {@code o.equals(e)}, if this queue contains one or more such
273      * elements.
274      * Returns {@code true} if this queue contained the specified element
275      * (or equivalently, if this queue changed as a result of the call).
276      *
277      * @param o element to be removed from this queue, if present
278      * @return {@code true} if this queue changed as a result of the call
279      * @throws ClassCastException if the class of the specified element
280      *         is incompatible with this queue
281      * (<a href="{@docRoot}/java.base/java/util/Collection.html#optional-restrictions">optional</a>)
282      * @throws NullPointerException if the specified element is null
283      * (<a href="{@docRoot}/java.base/java/util/Collection.html#optional-restrictions">optional</a>)
284      */
285     bool remove(E o);
286 
287     /**
288      * Returns {@code true} if this queue contains the specified element.
289      * More formally, returns {@code true} if and only if this queue contains
290      * at least one element {@code e} such that {@code o.equals(e)}.
291      *
292      * @param o object to be checked for containment in this queue
293      * @return {@code true} if this queue contains the specified element
294      * @throws ClassCastException if the class of the specified element
295      *         is incompatible with this queue
296      * (<a href="{@docRoot}/java.base/java/util/Collection.html#optional-restrictions">optional</a>)
297      * @throws NullPointerException if the specified element is null
298      * (<a href="{@docRoot}/java.base/java/util/Collection.html#optional-restrictions">optional</a>)
299      */
300     bool contains(E o);
301 
302     /**
303      * Removes all available elements from this queue and adds them
304      * to the given collection.  This operation may be more
305      * efficient than repeatedly polling this queue.  A failure
306      * encountered while attempting to add elements to
307      * collection {@code c} may result in elements being in neither,
308      * either or both collections when the associated exception is
309      * thrown.  Attempts to drain a queue to itself result in
310      * {@code IllegalArgumentException}. Further, the behavior of
311      * this operation is undefined if the specified collection is
312      * modified while the operation is in progress.
313      *
314      * @param c the collection to transfer elements into
315      * @return the number of elements transferred
316      * @throws UnsupportedOperationException if addition of elements
317      *         is not supported by the specified collection
318      * @throws ClassCastException if the class of an element of this queue
319      *         prevents it from being added to the specified collection
320      * @throws NullPointerException if the specified collection is null
321      * @throws IllegalArgumentException if the specified collection is this
322      *         queue, or some property of an element of this queue prevents
323      *         it from being added to the specified collection
324      */
325     int drainTo(Collection!(E) c);
326 
327     /**
328      * Removes at most the given number of available elements from
329      * this queue and adds them to the given collection.  A failure
330      * encountered while attempting to add elements to
331      * collection {@code c} may result in elements being in neither,
332      * either or both collections when the associated exception is
333      * thrown.  Attempts to drain a queue to itself result in
334      * {@code IllegalArgumentException}. Further, the behavior of
335      * this operation is undefined if the specified collection is
336      * modified while the operation is in progress.
337      *
338      * @param c the collection to transfer elements into
339      * @param maxElements the maximum number of elements to transfer
340      * @return the number of elements transferred
341      * @throws UnsupportedOperationException if addition of elements
342      *         is not supported by the specified collection
343      * @throws ClassCastException if the class of an element of this queue
344      *         prevents it from being added to the specified collection
345      * @throws NullPointerException if the specified collection is null
346      * @throws IllegalArgumentException if the specified collection is this
347      *         queue, or some property of an element of this queue prevents
348      *         it from being added to the specified collection
349      */
350     int drainTo(Collection!(E) c, int maxElements);
351 }
352 
353 
354 // TODO: Tasks pending completion -@zxp at 12/31/2018, 10:15:14 AM
355 // 
356 // abstract class AbstractBlockingQueue(E) : AbstractQueue!(E), BlockingQueue!(E) {
357 
358 //     /**
359 //      * Constructor for use by subclasses.
360 //      */
361 //     protected this() {
362 //     }
363 
364 //     /**
365 //      * Inserts the specified element into this queue if it is possible to do so
366 //      * immediately without violating capacity restrictions, returning
367 //      * {@code true} upon success and throwing an {@code IllegalStateException}
368 //      * if no space is currently available.
369 //      *
370 //      * <p>This implementation returns {@code true} if {@code offer} succeeds,
371 //      * else throws an {@code IllegalStateException}.
372 //      *
373 //      * @param e the element to add
374 //      * @return {@code true} (as specified by {@link Collection#add})
375 //      * @throws IllegalStateException if the element cannot be added at this
376 //      *         time due to capacity restrictions
377 //      * @throws ClassCastException if the class of the specified element
378 //      *         prevents it from being added to this queue
379 //      * @throws NullPointerException if the specified element is null and
380 //      *         this queue does not permit null elements
381 //      * @throws IllegalArgumentException if some property of this element
382 //      *         prevents it from being added to this queue
383 //      */
384 //     override bool add(E e) {
385 //         if (offer(e))
386 //             return true;
387 //         else
388 //             throw new IllegalStateException("Queue full");
389 //     }
390 
391 //     /**
392 //      * Retrieves and removes the head of this queue.  This method differs
393 //      * from {@link #poll poll} only in that it throws an exception if this
394 //      * queue is empty.
395 //      *
396 //      * <p>This implementation returns the result of {@code poll}
397 //      * unless the queue is empty.
398 //      *
399 //      * @return the head of this queue
400 //      * @throws NoSuchElementException if this queue is empty
401 //      */
402 //     E remove() {
403 //         E x = poll();
404 //         static if(is(E == class) || is(E == string)) {
405 //             if (x is null) throw new NoSuchElementException();
406 //         }
407 //         return x;
408 //     }
409 
410 //     /**
411 //      * Retrieves, but does not remove, the head of this queue.  This method
412 //      * differs from {@link #peek peek} only in that it throws an exception if
413 //      * this queue is empty.
414 //      *
415 //      * <p>This implementation returns the result of {@code peek}
416 //      * unless the queue is empty.
417 //      *
418 //      * @return the head of this queue
419 //      * @throws NoSuchElementException if this queue is empty
420 //      */
421 //     E element() {
422 //         E x = peek();
423         
424 //         static if(is(E == class) || is(E == string)) {
425 //             if (x is null) throw new NoSuchElementException();
426 //         }
427 //         return x;
428 //     }
429 
430 //     /**
431 //      * Removes all of the elements from this queue.
432 //      * The queue will be empty after this call returns.
433 //      *
434 //      * <p>This implementation repeatedly invokes {@link #poll poll} until it
435 //      * returns {@code null}.
436 //      */
437 //     override void clear() {
438 //         static if(is(E == class) || is(E == string)) {
439 //             while (poll() !is null) {}
440 //         } else {
441 //             while(size()>0) {
442 //                 poll();
443 //             }
444 //         }
445 //     }
446 
447 //     /**
448 //      * Adds all of the elements in the specified collection to this
449 //      * queue.  Attempts to addAll of a queue to itself result in
450 //      * {@code IllegalArgumentException}. Further, the behavior of
451 //      * this operation is undefined if the specified collection is
452 //      * modified while the operation is in progress.
453 //      *
454 //      * <p>This implementation iterates over the specified collection,
455 //      * and adds each element returned by the iterator to this
456 //      * queue, in turn.  A runtime exception encountered while
457 //      * trying to add an element (including, in particular, a
458 //      * {@code null} element) may result in only some of the elements
459 //      * having been successfully added when the associated exception is
460 //      * thrown.
461 //      *
462 //      * @param c collection containing elements to be added to this queue
463 //      * @return {@code true} if this queue changed as a result of the call
464 //      * @throws ClassCastException if the class of an element of the specified
465 //      *         collection prevents it from being added to this queue
466 //      * @throws NullPointerException if the specified collection contains a
467 //      *         null element and this queue does not permit null elements,
468 //      *         or if the specified collection is null
469 //      * @throws IllegalArgumentException if some property of an element of the
470 //      *         specified collection prevents it from being added to this
471 //      *         queue, or if the specified collection is this queue
472 //      * @throws IllegalStateException if not all the elements can be added at
473 //      *         this time due to insertion restrictions
474 //      * @see #add(Object)
475 //      */
476 //     override bool addAll(Collection!E c) {
477 //         if (c is null)
478 //             throw new NullPointerException();
479 //         if (c is this)
480 //             throw new IllegalArgumentException();
481 //         bool modified = false;
482 //         foreach (E e ; c) {
483 //             if (add(e)) modified = true;
484 //         }
485 //         return modified;
486 //     }
487 
488 //     override bool opEquals(IObject o) {
489 //         return opEquals(cast(Object) o);
490 //     }
491     
492 //     override bool opEquals(Object o) {
493 //         return super.opEquals(o);
494 //     }
495 
496 //     override size_t toHash() @trusted nothrow {
497 //         return super.toHash();
498 //     }
499 
500 //     override string toString() {
501 //         return super.toString();
502 //     }
503 // }