1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.concurrency.LinkedBlockingQueue;
13 
14 import hunt.concurrency.atomic.AtomicHelper;
15 import hunt.concurrency.Helpers;
16 import hunt.concurrency.BlockingQueue;
17 
18 import hunt.collection.AbstractQueue;
19 import hunt.collection.Collection;
20 import hunt.collection.Iterator;
21 import hunt.util.DateTime;
22 import hunt.Exceptions;
23 import hunt.Functions;
24 import hunt.Object;
25 
26 // import core.atomic;
27 import core.sync.mutex;
28 import core.sync.condition;
29 import core.time;
30 
31 import hunt.logging.ConsoleLogger;
32 
33 /**
34  * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
35  * linked nodes.
36  * This queue orders elements FIFO (first-in-first-out).
37  * The <em>head</em> of the queue is that element that has been on the
38  * queue the longest time.
39  * The <em>tail</em> of the queue is that element that has been on the
40  * queue the shortest time. New elements
41  * are inserted at the tail of the queue, and the queue retrieval
42  * operations obtain elements at the head of the queue.
43  * Linked queues typically have higher throughput than array-based queues but
44  * less predictable performance in most concurrent applications.
45  *
46  * <p>The optional capacity bound constructor argument serves as a
47  * way to prevent excessive queue expansion. The capacity, if unspecified,
48  * is equal to {@link Integer#MAX_VALUE}.  Linked nodes are
49  * dynamically created upon each insertion unless this would bring the
50  * queue above capacity.
51  *
52  * <p>This class and its iterator implement all of the <em>optional</em>
53  * methods of the {@link Collection} and {@link Iterator} interfaces.
54  *
55  * <p>This class is a member of the
56  * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework">
57  * Java Collections Framework</a>.
58  *
59  * @author Doug Lea
60  * @param (E) the type of elements held in this queue
61  */
62 class LinkedBlockingQueue(E) : AbstractQueue!(E), BlockingQueue!(E) {
63 
64     /*
65      * A variant of the "two lock queue" algorithm.  The putLock gates
66      * entry to put (and offer), and has an associated condition for
67      * waiting puts.  Similarly for the takeLock.  The "count" field
68      * that they both rely on is maintained as an atomic to avoid
69      * needing to get both locks in most cases. Also, to minimize need
70      * for puts to get takeLock and vice-versa, cascading notifies are
71      * used. When a put notices that it has enabled at least one take,
72      * it signals taker. That taker in turn signals others if more
73      * items have been entered since the signal. And symmetrically for
74      * takes signalling puts. Operations such as remove(Object) and
75      * iterators acquire both locks.
76      *
77      * Visibility between writers and readers is provided as follows:
78      *
79      * Whenever an element is enqueued, the putLock is acquired and
80      * count updated.  A subsequent reader guarantees visibility to the
81      * enqueued Node by either acquiring the putLock (via fullyLock)
82      * or by acquiring the takeLock, and then reading n = atomicLoad(count);
83      * this gives visibility to the first n items.
84      *
85      * To implement weakly consistent iterators, it appears we need to
86      * keep all Nodes GC-reachable from a predecessor dequeued Node.
87      * That would cause two problems:
88      * - allow a rogue Iterator to cause unbounded memory retention
89      * - cause cross-generational linking of old Nodes to new Nodes if
90      *   a Node was tenured while live, which generational GCs have a
91      *   hard time dealing with, causing repeated major collections.
92      * However, only non-deleted Nodes need to be reachable from
93      * dequeued Nodes, and reachability does not necessarily have to
94      * be of the kind understood by the GC.  We use the trick of
95      * linking a Node that has just been dequeued to itself.  Such a
96      * self-link implicitly means to advance to head.next.
97      */
98 
99     /**
100      * Linked list node class.
101      */
102     static class Node(E) {
103         E item;
104 
105         /**
106          * One of:
107          * - the real successor Node
108          * - this Node, meaning the successor is head.next
109          * - null, meaning there is no successor (this is the last node)
110          */
111         Node!(E) next;
112 
113         this(E x) { item = x; }
114     }
115 
116     /** The capacity bound, or int.max if none */
117     private int capacity;
118 
119     /** Current number of elements */
120     private shared(int) count;
121 
122     /**
123      * Head of linked list.
124      * Invariant: head.item is null
125      */
126     private Node!(E) head;
127 
128     /**
129      * Tail of linked list.
130      * Invariant: last.next is null
131      */
132     private Node!(E) last;
133 
134     /** Lock held by take, poll, etc */
135     private Mutex takeLock;
136 
137     /** Wait queue for waiting takes */
138     private Condition notEmpty;
139 
140     /** Lock held by put, offer, etc */
141     private Mutex putLock;
142 
143     /** Wait queue for waiting puts */
144     private Condition notFull;
145 
146     private void initilize() {
147         takeLock = new Mutex();
148         putLock = new Mutex();
149         notEmpty = new Condition(takeLock);
150         notFull = new Condition(putLock);
151     }
152 
153     /**
154      * Signals a waiting take. Called only from put/offer (which do not
155      * otherwise ordinarily lock takeLock.)
156      */
157     private void signalNotEmpty() {
158         Mutex takeLock = this.takeLock;
159         takeLock.lock();
160         // scope(exit) takeLock.unlock();
161         try {
162             notEmpty.notify();
163         } finally {
164             takeLock.unlock();
165         }
166     }
167 
168     /**
169      * Signals a waiting put. Called only from take/poll.
170      */
171     private void signalNotFull() {
172         Mutex putLock = this.putLock;
173         putLock.lock();
174         try {
175             notFull.notify();
176         } finally {
177             putLock.unlock();
178         }
179     }
180 
181     /**
182      * Links node at end of queue.
183      *
184      * @param node the node
185      */
186     private void enqueue(Node!(E) node) {
187         // assert putLock.isHeldByCurrentThread();
188         // assert last.next is null;
189         last = last.next = node;
190     }
191 
192     /**
193      * Removes a node from head of queue.
194      *
195      * @return the node
196      */
197     private E dequeue() {
198         // assert takeLock.isHeldByCurrentThread();
199         // assert head.item is null;
200         Node!(E) h = head;
201         Node!(E) first = h.next;
202         h.next = h; // help GC
203         head = first;
204         E x = first.item;
205         first.item = E.init;
206         return x;
207     }
208 
209     /**
210      * Locks to prevent both puts and takes.
211      */
212     void fullyLock() {
213         putLock.lock();
214         takeLock.lock();
215     }
216 
217     /**
218      * Unlocks to allow both puts and takes.
219      */
220     void fullyUnlock() {
221         takeLock.unlock();
222         putLock.unlock();
223     }
224 
225     /**
226      * Creates a {@code LinkedBlockingQueue} with a capacity of
227      * {@link Integer#MAX_VALUE}.
228      */
229     this() {
230         this(int.max);
231     }
232 
233     /**
234      * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
235      *
236      * @param capacity the capacity of this queue
237      * @throws IllegalArgumentException if {@code capacity} is not greater
238      *         than zero
239      */
240     this(int capacity) {
241         if (capacity <= 0) throw new IllegalArgumentException();
242         this.capacity = capacity;
243         last = head = new Node!(E)(E.init);
244         initilize();
245     }
246 
247     /**
248      * Creates a {@code LinkedBlockingQueue} with a capacity of
249      * {@link Integer#MAX_VALUE}, initially containing the elements of the
250      * given collection,
251      * added in traversal order of the collection's iterator.
252      *
253      * @param c the collection of elements to initially contain
254      * @throws NullPointerException if the specified collection or any
255      *         of its elements are null
256      */
257     this(Collection!(E) c) {
258         this(int.max);
259         Mutex putLock = this.putLock;
260         putLock.lock(); // Never contended, but necessary for visibility
261         try {
262             int n = 0;
263             foreach (E e ; c) {
264                 static if(is(E == class) || is(E == string)) {
265                     if (e is null) throw new NullPointerException();
266                 }
267                 if (n == capacity)
268                     throw new IllegalStateException("Queue full");
269                 enqueue(new Node!(E)(e));
270                 ++n;
271             }
272             count = n;
273         } finally {
274             putLock.unlock();
275         }
276     }
277 
278     // this doc comment is overridden to remove the reference to collections
279     // greater in size than int.max
280     /**
281      * Returns the number of elements in this queue.
282      *
283      * @return the number of elements in this queue
284      */
285     override int size() {
286         return count;
287     }
288 
289     // this doc comment is a modified copy of the inherited doc comment,
290     // without the reference to unlimited queues.
291     /**
292      * Returns the number of additional elements that this queue can ideally
293      * (in the absence of memory or resource constraints) accept without
294      * blocking. This is always equal to the initial capacity of this queue
295      * less the current {@code size} of this queue.
296      *
297      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
298      * an element will succeed by inspecting {@code remainingCapacity}
299      * because it may be the case that another thread is about to
300      * insert or remove an element.
301      */
302     int remainingCapacity() {
303         return capacity - count;
304     }
305 
306     override bool add(E e) {
307         return super.add(e);
308     }
309 
310     /**
311      * Inserts the specified element at the tail of this queue, waiting if
312      * necessary for space to become available.
313      *
314      * @throws InterruptedException {@inheritDoc}
315      * @throws NullPointerException {@inheritDoc}
316      */
317     void put(E e) {
318         static if(is(E == class) || is(E == string)) {
319             if (e is null) throw new NullPointerException();
320         }
321         int c = 0;
322         Node!(E) node = new Node!(E)(e);
323         Mutex putLock = this.putLock;
324         putLock.lock();
325         try {
326             /*
327              * Note that count is used in wait guard even though it is
328              * not protected by lock. This works because count can
329              * only decrease at this point (all other puts are shut
330              * out by lock), and we (or some other waiting put) are
331              * signalled if it ever changes from capacity. Similarly
332              * for all other uses of count in other wait guards.
333              */
334             while (count == capacity) {
335                 notFull.wait();
336             }
337             enqueue(node);
338             c = AtomicHelper.getAndIncrement(count);
339             if (c + 1 < capacity)
340                 notFull.notify();
341         } finally {
342             putLock.unlock();
343         }
344         if (c == 0)
345             signalNotEmpty();
346     }
347 
348     /**
349      * Inserts the specified element at the tail of this queue, waiting if
350      * necessary up to the specified wait time for space to become available.
351      *
352      * @return {@code true} if successful, or {@code false} if
353      *         the specified waiting time elapses before space is available
354      * @throws InterruptedException {@inheritDoc}
355      * @throws NullPointerException {@inheritDoc}
356      */
357     bool offer(E e, Duration timeout) {
358         static if(is(E == class) || is(E == string)) {
359             if (e is null) throw new NullPointerException();
360         }
361 
362         int c = 0;
363         Mutex putLock = this.putLock;
364         putLock.lock();
365         try {
366             while (count == capacity) {
367                 // if (nanos <= 0L)
368                 //     return false;
369                 // nanos = notFull.wait(nanos);
370                 if(!notFull.wait(timeout)) return false;
371             }
372             enqueue(new Node!(E)(e));
373             c = AtomicHelper.getAndIncrement(count);
374             if (c + 1 < capacity)
375                 notFull.notify();
376         } finally {
377             putLock.unlock();
378         }
379         if (c == 0)
380             signalNotEmpty();
381         return true;
382     }
383 
384     /**
385      * Inserts the specified element at the tail of this queue if it is
386      * possible to do so immediately without exceeding the queue's capacity,
387      * returning {@code true} upon success and {@code false} if this queue
388      * is full.
389      * When using a capacity-restricted queue, this method is generally
390      * preferable to method {@link BlockingQueue#add add}, which can fail to
391      * insert an element only by throwing an exception.
392      *
393      * @throws NullPointerException if the specified element is null
394      */
395     bool offer(E e) {
396         static if(is(E == class) || is(E == string)) {
397             if (e is null) throw new NullPointerException();
398         }
399         // int count = this.count;
400         if (count  == capacity)
401             return false;
402         int c;
403         Node!(E) node = new Node!(E)(e);
404         Mutex putLock = this.putLock;
405         putLock.lock();
406         try {
407             if (count == capacity)
408                 return false;
409             enqueue(node);
410             c = AtomicHelper.getAndIncrement(count);
411             if (c + 1 < capacity)
412                 notFull.notify();
413         } finally {
414             putLock.unlock();
415         }
416         
417         if (c == 0)
418             signalNotEmpty();
419         return true;
420     }
421 
422     E take() {
423         E x;
424         int c;
425         Mutex takeLock = this.takeLock;
426         takeLock.lock();
427         try {
428             while (count == 0) {
429                 notEmpty.wait();
430             }
431             x = dequeue();
432             c = AtomicHelper.getAndDecrement(count);
433             if (c > 1)
434                 notEmpty.notify();
435         } finally {
436             takeLock.unlock();
437         }
438         if (c == capacity)
439             signalNotFull();
440         return x;
441     }
442 
443     E poll(Duration timeout) {
444         E x;
445         int c;
446         // int count = this.count;
447         Mutex takeLock = this.takeLock;
448         takeLock.lock();
449         try {
450             while (count == 0) {
451                 if(!notFull.wait(timeout)) return E.init;
452             }
453             x = dequeue();
454             c = AtomicHelper.getAndDecrement(count);
455             if (c > 1)
456                 notEmpty.notify();
457         } finally {
458             takeLock.unlock();
459         }
460         if (c == capacity)
461             signalNotFull();
462         return x;
463     }
464 
465     E poll() {
466         // int count = this.count;
467         if (count == 0)
468             throw new NoSuchElementException();
469 
470         E x;
471         int c;
472         Mutex takeLock = this.takeLock;
473         takeLock.lock();
474         try {
475             if (count == 0)
476                 return E.init;
477             x = dequeue();
478             c = AtomicHelper.getAndDecrement(count);
479             if (c > 1)
480                 notEmpty.notify();
481         } finally {
482             takeLock.unlock();
483         }
484         if (c == capacity)
485             signalNotFull();
486         return x;
487     }
488 
489     E peek() {
490         // if (atomicLoad(count) == 0)
491         //     return E.init;
492         
493         if (count == 0)
494             throw new NoSuchElementException();
495 
496         Mutex takeLock = this.takeLock;
497         takeLock.lock();
498         try {
499             return (count > 0) ? head.next.item : E.init;
500         } finally {
501             takeLock.unlock();
502         }
503     }
504 
505     /**
506      * Unlinks interior Node p with predecessor pred.
507      */
508     private void unlink(Node!(E) p, Node!(E) pred) {
509         // assert putLock.isHeldByCurrentThread();
510         // assert takeLock.isHeldByCurrentThread();
511         // p.next is not changed, to allow iterators that are
512         // traversing p to maintain their weak-consistency guarantee.
513         p.item = E.init;
514         pred.next = p.next;
515         if (last == p)
516             last = pred;
517         if (AtomicHelper.getAndDecrement(count) == capacity)
518             notFull.notify();
519     }
520 
521     /**
522      * Removes a single instance of the specified element from this queue,
523      * if it is present.  More formally, removes an element {@code e} such
524      * that {@code o.equals(e)}, if this queue contains one or more such
525      * elements.
526      * Returns {@code true} if this queue contained the specified element
527      * (or equivalently, if this queue changed as a result of the call).
528      *
529      * @param o element to be removed from this queue, if present
530      * @return {@code true} if this queue changed as a result of the call
531      */
532     override bool remove(E o) {
533         static if(is(E == class) || is(E == string)) {
534             if (o is null) return false;
535         }
536 
537         fullyLock();
538         try {
539             for (Node!(E) pred = head, p = pred.next;
540                  p !is null;
541                  pred = p, p = p.next) {
542                 if (o == p.item) {
543                     unlink(p, pred);
544                     return true;
545                 }
546             }
547             return false;
548         } finally {
549             fullyUnlock();
550         }
551     }
552 
553     /**
554      * Returns {@code true} if this queue contains the specified element.
555      * More formally, returns {@code true} if and only if this queue contains
556      * at least one element {@code e} such that {@code o.equals(e)}.
557      *
558      * @param o object to be checked for containment in this queue
559      * @return {@code true} if this queue contains the specified element
560      */
561     override bool contains(E o) {
562         static if(is(E == class) || is(E == string)) {
563             if (o is null) return false;
564         }
565         fullyLock();
566         try {
567             for (Node!(E) p = head.next; p !is null; p = p.next)
568                 if (o == p.item)
569                     return true;
570             return false;
571         } finally {
572             fullyUnlock();
573         }
574     }
575 
576     /**
577      * Returns an array containing all of the elements in this queue, in
578      * proper sequence.
579      *
580      * <p>The returned array will be "safe" in that no references to it are
581      * maintained by this queue.  (In other words, this method must allocate
582      * a new array).  The caller is thus free to modify the returned array.
583      *
584      * <p>This method acts as bridge between array-based and collection-based
585      * APIs.
586      *
587      * @return an array containing all of the elements in this queue
588      */
589     override E[] toArray() {
590         fullyLock();
591         try {
592             int size = count;
593             E[] a = new E[size];
594             int k = 0;
595             for (Node!(E) p = head.next; p !is null; p = p.next)
596                 a[k++] = p.item;
597             return a;
598         } finally {
599             fullyUnlock();
600         }
601     }
602 
603     /**
604      * Returns an array containing all of the elements in this queue, in
605      * proper sequence; the runtime type of the returned array is that of
606      * the specified array.  If the queue fits in the specified array, it
607      * is returned therein.  Otherwise, a new array is allocated with the
608      * runtime type of the specified array and the size of this queue.
609      *
610      * <p>If this queue fits in the specified array with room to spare
611      * (i.e., the array has more elements than this queue), the element in
612      * the array immediately following the end of the queue is set to
613      * {@code null}.
614      *
615      * <p>Like the {@link #toArray()} method, this method acts as bridge between
616      * array-based and collection-based APIs.  Further, this method allows
617      * precise control over the runtime type of the output array, and may,
618      * under certain circumstances, be used to save allocation costs.
619      *
620      * <p>Suppose {@code x} is a queue known to contain only strings.
621      * The following code can be used to dump the queue into a newly
622      * allocated array of {@code string}:
623      *
624      * <pre> {@code string[] y = x.toArray(new string[0]);}</pre>
625      *
626      * Note that {@code toArray(new Object[0])} is identical in function to
627      * {@code toArray()}.
628      *
629      * @param a the array into which the elements of the queue are to
630      *          be stored, if it is big enough; otherwise, a new array of the
631      *          same runtime type is allocated for this purpose
632      * @return an array containing all of the elements in this queue
633      * @throws ArrayStoreException if the runtime type of the specified array
634      *         is not a supertype of the runtime type of every element in
635      *         this queue
636      * @throws NullPointerException if the specified array is null
637      */
638 
639     // !(T) T[] toArray(T[] a) {
640     //     fullyLock();
641     //     try {
642     //         int size = atomicLoad(count);
643     //         if (a.length < size)
644     //             a = (T[])java.lang.reflect.Array.newInstance
645     //                 (a.getClass().getComponentType(), size);
646 
647     //         int k = 0;
648     //         for (Node!(E) p = head.next; p !is null; p = p.next)
649     //             a[k++] = (T)p.item;
650     //         if (a.length > k)
651     //             a[k] = null;
652     //         return a;
653     //     } finally {
654     //         fullyUnlock();
655     //     }
656     // }
657 
658     override string toString() {
659         return Helpers.collectionToString(this);
660     }
661 
662     /**
663      * Atomically removes all of the elements from this queue.
664      * The queue will be empty after this call returns.
665      */
666     override void clear() {
667         fullyLock();
668         try {
669             for (Node!(E) p, h = head; (p = h.next) !is null; h = p) {
670                 h.next = h;
671                 p.item = E.init;
672             }
673             head = last;
674             // assert head.item is null && head.next is null;
675             int c = count;
676             AtomicHelper.store(count, 0);
677             if (c == capacity)
678                 notFull.notify();
679         } finally {
680             fullyUnlock();
681         }
682     }
683 
684     /**
685      * @throws UnsupportedOperationException {@inheritDoc}
686      * @throws ClassCastException            {@inheritDoc}
687      * @throws NullPointerException          {@inheritDoc}
688      * @throws IllegalArgumentException      {@inheritDoc}
689      */
690     int drainTo(Collection!(E) c) {
691         return drainTo(c, int.max);
692     }
693 
694     /**
695      * @throws UnsupportedOperationException {@inheritDoc}
696      * @throws ClassCastException            {@inheritDoc}
697      * @throws NullPointerException          {@inheritDoc}
698      * @throws IllegalArgumentException      {@inheritDoc}
699      */
700     int drainTo(Collection!(E) c, int maxElements) {
701         // Objects.requireNonNull(c);
702         if (c == this)
703             throw new IllegalArgumentException();
704         if (maxElements <= 0)
705             return 0;
706         bool canSignalNotFull = false;
707         Mutex takeLock = this.takeLock;
708         takeLock.lock();
709         try {
710             import std.algorithm : min;
711             int n = min(maxElements, count);
712             // count.get provides visibility to first n Nodes
713             Node!(E) h = head;
714             int i = 0;
715             try {
716                 while (i < n) {
717                     Node!(E) p = h.next;
718                     c.add(p.item);
719                     p.item = E.init;
720                     h.next = h;
721                     h = p;
722                     ++i;
723                 }
724                 return n;
725             } finally {
726                 // Restore invariants even if c.add() threw
727                 if (i > 0) {
728                     // assert h.item is null;
729                     head = h;
730                     int ct = AtomicHelper.getAndAdd(count, i);
731                     canSignalNotFull = ((ct - i) == capacity);
732                 }
733             }
734         } finally {
735             takeLock.unlock();
736             if (canSignalNotFull)
737                 signalNotFull();
738         }
739     }
740 
741     /**
742      * Used for any element traversal that is not entirely under lock.
743      * Such traversals must handle both:
744      * - dequeued nodes (p.next == p)
745      * - (possibly multiple) interior removed nodes (p.item is null)
746      */
747     Node!(E) succ(Node!(E) p) {
748         if (p == (p = p.next))
749             p = head.next;
750         return p;
751     }
752 
753     override int opApply(scope int delegate(ref E) dg) {
754         if(dg is null)
755             throw new NullPointerException();
756 
757         return forEachFrom(dg, null);
758     }
759 
760     /**
761      * Runs action on each element found during a traversal starting at p.
762      * If p is null, traversal starts at head.
763      */
764     private int forEachFrom(scope int delegate(ref E) action, Node!(E) p) {
765         // Extract batches of elements while holding the lock; then
766         // run the action on the elements while not
767         const int batchSize = 64;       // max number of elements per batch
768         E[] es = null;             // container for batch of elements
769         int n, len = 0;
770         int result = 0;
771         do {
772             fullyLock();
773             try {
774                 if (es is null) {
775                     if (p is null) p = head.next;
776                     for (Node!(E) q = p; q !is null; q = succ(q))
777                         static if(is(E == class) || is(E == string)) {
778                             if (q.item !is null && ++len == batchSize)
779                                 break;
780                         } else {
781                             if (++len == batchSize)
782                                 break;
783                         }
784                     es = new E[len];
785                 }
786                 for (n = 0; p !is null && n < len; p = succ(p)) {
787                     es[n] = p.item;
788                     static if(is(E == class) || is(E == string)) {
789                         if (es[n] !is null)
790                             n++;
791                     } else {
792                         n++;
793                     }
794                 }
795             } finally {
796                 fullyUnlock();
797             }
798 
799             for (int i = 0; i < n; i++) {
800                 E e = es[i];
801                 result = action(e);
802                 if(result != 0) return result;
803             }
804         } while (n > 0 && p !is null);
805 
806         return result;
807     }
808 
809     /**
810      * @throws NullPointerException {@inheritDoc}
811      */
812     override bool removeIf(Predicate!(E) filter) {
813         // Objects.requireNonNull(filter);
814         return bulkRemove(filter);
815     }
816 
817     /**
818      * @throws NullPointerException {@inheritDoc}
819      */
820     override bool removeAll(Collection!E c) {
821         // Objects.requireNonNull(c);
822         return bulkRemove(e => c.contains(e));
823     }
824 
825     /**
826      * @throws NullPointerException {@inheritDoc}
827      */
828     override bool retainAll(Collection!E c) {
829         // Objects.requireNonNull(c);
830         return bulkRemove(e => !c.contains(e));
831     }
832 
833     /**
834      * Returns the predecessor of live node p, given a node that was
835      * once a live ancestor of p (or head); allows unlinking of p.
836      */
837     Node!(E) findPred(Node!(E) p, Node!(E) ancestor) {
838         // assert p.item !is null;
839         static if(is(E == class) || is(E == string)) {
840             if (ancestor.item is null)
841                 ancestor = head;
842         }
843         // Fails with NPE if precondition not satisfied
844         for (Node!(E) q; (q = ancestor.next) != p; )
845             ancestor = q;
846         return ancestor;
847     }
848 
849     /** Implementation of bulk remove methods. */
850 
851     private bool bulkRemove(Predicate!(E) filter) {
852         bool removed = false;
853         Node!(E) p = null, ancestor = head;
854         Node!(E)[] nodes = null;
855         int n, len = 0;
856         do {
857             // 1. Extract batch of up to 64 elements while holding the lock.
858             fullyLock();
859             try {
860                 if (nodes is null) {  // first batch; initialize
861                     p = head.next;
862                     for (Node!(E) q = p; q !is null; q = succ(q)) {
863                         static if(is(E == class) || is(E == string)) {
864                             if (q.item !is null && ++len == 64)
865                                 break;
866                         } else {
867                             if (++len == 64)
868                                 break;
869                         }
870                     }
871                     nodes = new Node!(E)[len];
872                 }
873                 for (n = 0; p !is null && n < len; p = succ(p))
874                     nodes[n++] = p;
875             } finally {
876                 fullyUnlock();
877             }
878 
879             // 2. Run the filter on the elements while lock is free.
880             long deathRow = 0L;       // "bitset" of size 64
881             for (int i = 0; i < n; i++) {
882                 E e = nodes[i].item;
883                 static if(is(E == class) || is(E == string)) {
884                     if (e !is null && filter(e)) deathRow |= 1L << i;
885                 } else {
886                     if (filter(e)) deathRow |= 1L << i;
887                 }
888             }
889 
890             // 3. Remove any filtered elements while holding the lock.
891             if (deathRow != 0) {
892                 fullyLock();
893                 try {
894                     for (int i = 0; i < n; i++) {
895                         Node!(E) q;
896                         static if(is(E == class) || is(E == string)) {
897                             if ((deathRow & (1L << i)) != 0L
898                                 && (q = nodes[i]).item !is null) {
899                                 ancestor = findPred(q, ancestor);
900                                 unlink(q, ancestor);
901                                 removed = true;
902                             }
903                         } else {
904                             if ((deathRow & (1L << i)) != 0L) {
905                                 q = nodes[i];
906                                 ancestor = findPred(q, ancestor);
907                                 unlink(q, ancestor);
908                                 removed = true;
909                             }
910                         }
911                         nodes[i] = null; // help GC
912                     }
913                 } finally {
914                     fullyUnlock();
915                 }
916             }
917         } while (n > 0 && p !is null);
918         return removed;
919     }
920 
921     override bool opEquals(IObject o) {
922         return opEquals(cast(Object) o);
923     }
924     
925     override bool opEquals(Object o) {
926         return super.opEquals(o);
927     }
928 
929     override size_t toHash() @trusted nothrow {
930         return super.toHash();
931     }
932 
933 }