1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.concurrency.LinkedBlockingQueue; 13 14 import hunt.concurrency.atomic.AtomicHelper; 15 import hunt.concurrency.Helpers; 16 import hunt.concurrency.BlockingQueue; 17 18 import hunt.collection.AbstractQueue; 19 import hunt.collection.Collection; 20 import hunt.collection.Iterator; 21 import hunt.util.DateTime; 22 import hunt.Exceptions; 23 import hunt.Functions; 24 import hunt.Object; 25 26 // import core.atomic; 27 import core.sync.mutex; 28 import core.sync.condition; 29 import core.time; 30 31 import hunt.logging.ConsoleLogger; 32 33 /** 34 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on 35 * linked nodes. 36 * This queue orders elements FIFO (first-in-first-out). 37 * The <em>head</em> of the queue is that element that has been on the 38 * queue the longest time. 39 * The <em>tail</em> of the queue is that element that has been on the 40 * queue the shortest time. New elements 41 * are inserted at the tail of the queue, and the queue retrieval 42 * operations obtain elements at the head of the queue. 43 * Linked queues typically have higher throughput than array-based queues but 44 * less predictable performance in most concurrent applications. 45 * 46 * <p>The optional capacity bound constructor argument serves as a 47 * way to prevent excessive queue expansion. The capacity, if unspecified, 48 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are 49 * dynamically created upon each insertion unless this would bring the 50 * queue above capacity. 51 * 52 * <p>This class and its iterator implement all of the <em>optional</em> 53 * methods of the {@link Collection} and {@link Iterator} interfaces. 54 * 55 * <p>This class is a member of the 56 * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework"> 57 * Java Collections Framework</a>. 58 * 59 * @author Doug Lea 60 * @param (E) the type of elements held in this queue 61 */ 62 class LinkedBlockingQueue(E) : AbstractQueue!(E), BlockingQueue!(E) { 63 64 /* 65 * A variant of the "two lock queue" algorithm. The putLock gates 66 * entry to put (and offer), and has an associated condition for 67 * waiting puts. Similarly for the takeLock. The "count" field 68 * that they both rely on is maintained as an atomic to avoid 69 * needing to get both locks in most cases. Also, to minimize need 70 * for puts to get takeLock and vice-versa, cascading notifies are 71 * used. When a put notices that it has enabled at least one take, 72 * it signals taker. That taker in turn signals others if more 73 * items have been entered since the signal. And symmetrically for 74 * takes signalling puts. Operations such as remove(Object) and 75 * iterators acquire both locks. 76 * 77 * Visibility between writers and readers is provided as follows: 78 * 79 * Whenever an element is enqueued, the putLock is acquired and 80 * count updated. A subsequent reader guarantees visibility to the 81 * enqueued Node by either acquiring the putLock (via fullyLock) 82 * or by acquiring the takeLock, and then reading n = atomicLoad(count); 83 * this gives visibility to the first n items. 84 * 85 * To implement weakly consistent iterators, it appears we need to 86 * keep all Nodes GC-reachable from a predecessor dequeued Node. 87 * That would cause two problems: 88 * - allow a rogue Iterator to cause unbounded memory retention 89 * - cause cross-generational linking of old Nodes to new Nodes if 90 * a Node was tenured while live, which generational GCs have a 91 * hard time dealing with, causing repeated major collections. 92 * However, only non-deleted Nodes need to be reachable from 93 * dequeued Nodes, and reachability does not necessarily have to 94 * be of the kind understood by the GC. We use the trick of 95 * linking a Node that has just been dequeued to itself. Such a 96 * self-link implicitly means to advance to head.next. 97 */ 98 99 /** 100 * Linked list node class. 101 */ 102 static class Node(E) { 103 E item; 104 105 /** 106 * One of: 107 * - the real successor Node 108 * - this Node, meaning the successor is head.next 109 * - null, meaning there is no successor (this is the last node) 110 */ 111 Node!(E) next; 112 113 this(E x) { item = x; } 114 } 115 116 /** The capacity bound, or int.max if none */ 117 private int capacity; 118 119 /** Current number of elements */ 120 private shared(int) count; 121 122 /** 123 * Head of linked list. 124 * Invariant: head.item is null 125 */ 126 private Node!(E) head; 127 128 /** 129 * Tail of linked list. 130 * Invariant: last.next is null 131 */ 132 private Node!(E) last; 133 134 /** Lock held by take, poll, etc */ 135 private Mutex takeLock; 136 137 /** Wait queue for waiting takes */ 138 private Condition notEmpty; 139 140 /** Lock held by put, offer, etc */ 141 private Mutex putLock; 142 143 /** Wait queue for waiting puts */ 144 private Condition notFull; 145 146 private void initilize() { 147 takeLock = new Mutex(); 148 putLock = new Mutex(); 149 notEmpty = new Condition(takeLock); 150 notFull = new Condition(putLock); 151 } 152 153 /** 154 * Signals a waiting take. Called only from put/offer (which do not 155 * otherwise ordinarily lock takeLock.) 156 */ 157 private void signalNotEmpty() { 158 Mutex takeLock = this.takeLock; 159 takeLock.lock(); 160 // scope(exit) takeLock.unlock(); 161 try { 162 notEmpty.notify(); 163 } finally { 164 takeLock.unlock(); 165 } 166 } 167 168 /** 169 * Signals a waiting put. Called only from take/poll. 170 */ 171 private void signalNotFull() { 172 Mutex putLock = this.putLock; 173 putLock.lock(); 174 try { 175 notFull.notify(); 176 } finally { 177 putLock.unlock(); 178 } 179 } 180 181 /** 182 * Links node at end of queue. 183 * 184 * @param node the node 185 */ 186 private void enqueue(Node!(E) node) { 187 // assert putLock.isHeldByCurrentThread(); 188 // assert last.next is null; 189 last = last.next = node; 190 } 191 192 /** 193 * Removes a node from head of queue. 194 * 195 * @return the node 196 */ 197 private E dequeue() { 198 // assert takeLock.isHeldByCurrentThread(); 199 // assert head.item is null; 200 Node!(E) h = head; 201 Node!(E) first = h.next; 202 h.next = h; // help GC 203 head = first; 204 E x = first.item; 205 first.item = E.init; 206 return x; 207 } 208 209 /** 210 * Locks to prevent both puts and takes. 211 */ 212 void fullyLock() { 213 putLock.lock(); 214 takeLock.lock(); 215 } 216 217 /** 218 * Unlocks to allow both puts and takes. 219 */ 220 void fullyUnlock() { 221 takeLock.unlock(); 222 putLock.unlock(); 223 } 224 225 /** 226 * Creates a {@code LinkedBlockingQueue} with a capacity of 227 * {@link Integer#MAX_VALUE}. 228 */ 229 this() { 230 this(int.max); 231 } 232 233 /** 234 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity. 235 * 236 * @param capacity the capacity of this queue 237 * @throws IllegalArgumentException if {@code capacity} is not greater 238 * than zero 239 */ 240 this(int capacity) { 241 if (capacity <= 0) throw new IllegalArgumentException(); 242 this.capacity = capacity; 243 last = head = new Node!(E)(E.init); 244 initilize(); 245 } 246 247 /** 248 * Creates a {@code LinkedBlockingQueue} with a capacity of 249 * {@link Integer#MAX_VALUE}, initially containing the elements of the 250 * given collection, 251 * added in traversal order of the collection's iterator. 252 * 253 * @param c the collection of elements to initially contain 254 * @throws NullPointerException if the specified collection or any 255 * of its elements are null 256 */ 257 this(Collection!(E) c) { 258 this(int.max); 259 Mutex putLock = this.putLock; 260 putLock.lock(); // Never contended, but necessary for visibility 261 try { 262 int n = 0; 263 foreach (E e ; c) { 264 static if(is(E == class) || is(E == string)) { 265 if (e is null) throw new NullPointerException(); 266 } 267 if (n == capacity) 268 throw new IllegalStateException("Queue full"); 269 enqueue(new Node!(E)(e)); 270 ++n; 271 } 272 count = n; 273 } finally { 274 putLock.unlock(); 275 } 276 } 277 278 // this doc comment is overridden to remove the reference to collections 279 // greater in size than int.max 280 /** 281 * Returns the number of elements in this queue. 282 * 283 * @return the number of elements in this queue 284 */ 285 override int size() { 286 return count; 287 } 288 289 // this doc comment is a modified copy of the inherited doc comment, 290 // without the reference to unlimited queues. 291 /** 292 * Returns the number of additional elements that this queue can ideally 293 * (in the absence of memory or resource constraints) accept without 294 * blocking. This is always equal to the initial capacity of this queue 295 * less the current {@code size} of this queue. 296 * 297 * <p>Note that you <em>cannot</em> always tell if an attempt to insert 298 * an element will succeed by inspecting {@code remainingCapacity} 299 * because it may be the case that another thread is about to 300 * insert or remove an element. 301 */ 302 int remainingCapacity() { 303 return capacity - count; 304 } 305 306 override bool add(E e) { 307 return super.add(e); 308 } 309 310 /** 311 * Inserts the specified element at the tail of this queue, waiting if 312 * necessary for space to become available. 313 * 314 * @throws InterruptedException {@inheritDoc} 315 * @throws NullPointerException {@inheritDoc} 316 */ 317 void put(E e) { 318 static if(is(E == class) || is(E == string)) { 319 if (e is null) throw new NullPointerException(); 320 } 321 int c = 0; 322 Node!(E) node = new Node!(E)(e); 323 Mutex putLock = this.putLock; 324 putLock.lock(); 325 try { 326 /* 327 * Note that count is used in wait guard even though it is 328 * not protected by lock. This works because count can 329 * only decrease at this point (all other puts are shut 330 * out by lock), and we (or some other waiting put) are 331 * signalled if it ever changes from capacity. Similarly 332 * for all other uses of count in other wait guards. 333 */ 334 while (count == capacity) { 335 notFull.wait(); 336 } 337 enqueue(node); 338 c = AtomicHelper.getAndIncrement(count); 339 if (c + 1 < capacity) 340 notFull.notify(); 341 } finally { 342 putLock.unlock(); 343 } 344 if (c == 0) 345 signalNotEmpty(); 346 } 347 348 /** 349 * Inserts the specified element at the tail of this queue, waiting if 350 * necessary up to the specified wait time for space to become available. 351 * 352 * @return {@code true} if successful, or {@code false} if 353 * the specified waiting time elapses before space is available 354 * @throws InterruptedException {@inheritDoc} 355 * @throws NullPointerException {@inheritDoc} 356 */ 357 bool offer(E e, Duration timeout) { 358 static if(is(E == class) || is(E == string)) { 359 if (e is null) throw new NullPointerException(); 360 } 361 362 int c = 0; 363 Mutex putLock = this.putLock; 364 putLock.lock(); 365 try { 366 while (count == capacity) { 367 // if (nanos <= 0L) 368 // return false; 369 // nanos = notFull.wait(nanos); 370 if(!notFull.wait(timeout)) return false; 371 } 372 enqueue(new Node!(E)(e)); 373 c = AtomicHelper.getAndIncrement(count); 374 if (c + 1 < capacity) 375 notFull.notify(); 376 } finally { 377 putLock.unlock(); 378 } 379 if (c == 0) 380 signalNotEmpty(); 381 return true; 382 } 383 384 /** 385 * Inserts the specified element at the tail of this queue if it is 386 * possible to do so immediately without exceeding the queue's capacity, 387 * returning {@code true} upon success and {@code false} if this queue 388 * is full. 389 * When using a capacity-restricted queue, this method is generally 390 * preferable to method {@link BlockingQueue#add add}, which can fail to 391 * insert an element only by throwing an exception. 392 * 393 * @throws NullPointerException if the specified element is null 394 */ 395 bool offer(E e) { 396 static if(is(E == class) || is(E == string)) { 397 if (e is null) throw new NullPointerException(); 398 } 399 // int count = this.count; 400 if (count == capacity) 401 return false; 402 int c; 403 Node!(E) node = new Node!(E)(e); 404 Mutex putLock = this.putLock; 405 putLock.lock(); 406 try { 407 if (count == capacity) 408 return false; 409 enqueue(node); 410 c = AtomicHelper.getAndIncrement(count); 411 if (c + 1 < capacity) 412 notFull.notify(); 413 } finally { 414 putLock.unlock(); 415 } 416 417 if (c == 0) 418 signalNotEmpty(); 419 return true; 420 } 421 422 E take() { 423 E x; 424 int c; 425 Mutex takeLock = this.takeLock; 426 takeLock.lock(); 427 try { 428 while (count == 0) { 429 notEmpty.wait(); 430 } 431 x = dequeue(); 432 c = AtomicHelper.getAndDecrement(count); 433 if (c > 1) 434 notEmpty.notify(); 435 } finally { 436 takeLock.unlock(); 437 } 438 if (c == capacity) 439 signalNotFull(); 440 return x; 441 } 442 443 E poll(Duration timeout) { 444 E x; 445 int c; 446 // int count = this.count; 447 Mutex takeLock = this.takeLock; 448 takeLock.lock(); 449 try { 450 while (count == 0) { 451 if(!notFull.wait(timeout)) return E.init; 452 } 453 x = dequeue(); 454 c = AtomicHelper.getAndDecrement(count); 455 if (c > 1) 456 notEmpty.notify(); 457 } finally { 458 takeLock.unlock(); 459 } 460 if (c == capacity) 461 signalNotFull(); 462 return x; 463 } 464 465 E poll() { 466 // int count = this.count; 467 if (count == 0) 468 throw new NoSuchElementException(); 469 470 E x; 471 int c; 472 Mutex takeLock = this.takeLock; 473 takeLock.lock(); 474 try { 475 if (count == 0) 476 return E.init; 477 x = dequeue(); 478 c = AtomicHelper.getAndDecrement(count); 479 if (c > 1) 480 notEmpty.notify(); 481 } finally { 482 takeLock.unlock(); 483 } 484 if (c == capacity) 485 signalNotFull(); 486 return x; 487 } 488 489 E peek() { 490 // if (atomicLoad(count) == 0) 491 // return E.init; 492 493 if (count == 0) 494 throw new NoSuchElementException(); 495 496 Mutex takeLock = this.takeLock; 497 takeLock.lock(); 498 try { 499 return (count > 0) ? head.next.item : E.init; 500 } finally { 501 takeLock.unlock(); 502 } 503 } 504 505 /** 506 * Unlinks interior Node p with predecessor pred. 507 */ 508 private void unlink(Node!(E) p, Node!(E) pred) { 509 // assert putLock.isHeldByCurrentThread(); 510 // assert takeLock.isHeldByCurrentThread(); 511 // p.next is not changed, to allow iterators that are 512 // traversing p to maintain their weak-consistency guarantee. 513 p.item = E.init; 514 pred.next = p.next; 515 if (last == p) 516 last = pred; 517 if (AtomicHelper.getAndDecrement(count) == capacity) 518 notFull.notify(); 519 } 520 521 /** 522 * Removes a single instance of the specified element from this queue, 523 * if it is present. More formally, removes an element {@code e} such 524 * that {@code o.equals(e)}, if this queue contains one or more such 525 * elements. 526 * Returns {@code true} if this queue contained the specified element 527 * (or equivalently, if this queue changed as a result of the call). 528 * 529 * @param o element to be removed from this queue, if present 530 * @return {@code true} if this queue changed as a result of the call 531 */ 532 override bool remove(E o) { 533 static if(is(E == class) || is(E == string)) { 534 if (o is null) return false; 535 } 536 537 fullyLock(); 538 try { 539 for (Node!(E) pred = head, p = pred.next; 540 p !is null; 541 pred = p, p = p.next) { 542 if (o == p.item) { 543 unlink(p, pred); 544 return true; 545 } 546 } 547 return false; 548 } finally { 549 fullyUnlock(); 550 } 551 } 552 553 /** 554 * Returns {@code true} if this queue contains the specified element. 555 * More formally, returns {@code true} if and only if this queue contains 556 * at least one element {@code e} such that {@code o.equals(e)}. 557 * 558 * @param o object to be checked for containment in this queue 559 * @return {@code true} if this queue contains the specified element 560 */ 561 override bool contains(E o) { 562 static if(is(E == class) || is(E == string)) { 563 if (o is null) return false; 564 } 565 fullyLock(); 566 try { 567 for (Node!(E) p = head.next; p !is null; p = p.next) 568 if (o == p.item) 569 return true; 570 return false; 571 } finally { 572 fullyUnlock(); 573 } 574 } 575 576 /** 577 * Returns an array containing all of the elements in this queue, in 578 * proper sequence. 579 * 580 * <p>The returned array will be "safe" in that no references to it are 581 * maintained by this queue. (In other words, this method must allocate 582 * a new array). The caller is thus free to modify the returned array. 583 * 584 * <p>This method acts as bridge between array-based and collection-based 585 * APIs. 586 * 587 * @return an array containing all of the elements in this queue 588 */ 589 override E[] toArray() { 590 fullyLock(); 591 try { 592 int size = count; 593 E[] a = new E[size]; 594 int k = 0; 595 for (Node!(E) p = head.next; p !is null; p = p.next) 596 a[k++] = p.item; 597 return a; 598 } finally { 599 fullyUnlock(); 600 } 601 } 602 603 /** 604 * Returns an array containing all of the elements in this queue, in 605 * proper sequence; the runtime type of the returned array is that of 606 * the specified array. If the queue fits in the specified array, it 607 * is returned therein. Otherwise, a new array is allocated with the 608 * runtime type of the specified array and the size of this queue. 609 * 610 * <p>If this queue fits in the specified array with room to spare 611 * (i.e., the array has more elements than this queue), the element in 612 * the array immediately following the end of the queue is set to 613 * {@code null}. 614 * 615 * <p>Like the {@link #toArray()} method, this method acts as bridge between 616 * array-based and collection-based APIs. Further, this method allows 617 * precise control over the runtime type of the output array, and may, 618 * under certain circumstances, be used to save allocation costs. 619 * 620 * <p>Suppose {@code x} is a queue known to contain only strings. 621 * The following code can be used to dump the queue into a newly 622 * allocated array of {@code string}: 623 * 624 * <pre> {@code string[] y = x.toArray(new string[0]);}</pre> 625 * 626 * Note that {@code toArray(new Object[0])} is identical in function to 627 * {@code toArray()}. 628 * 629 * @param a the array into which the elements of the queue are to 630 * be stored, if it is big enough; otherwise, a new array of the 631 * same runtime type is allocated for this purpose 632 * @return an array containing all of the elements in this queue 633 * @throws ArrayStoreException if the runtime type of the specified array 634 * is not a supertype of the runtime type of every element in 635 * this queue 636 * @throws NullPointerException if the specified array is null 637 */ 638 639 // !(T) T[] toArray(T[] a) { 640 // fullyLock(); 641 // try { 642 // int size = atomicLoad(count); 643 // if (a.length < size) 644 // a = (T[])java.lang.reflect.Array.newInstance 645 // (a.getClass().getComponentType(), size); 646 647 // int k = 0; 648 // for (Node!(E) p = head.next; p !is null; p = p.next) 649 // a[k++] = (T)p.item; 650 // if (a.length > k) 651 // a[k] = null; 652 // return a; 653 // } finally { 654 // fullyUnlock(); 655 // } 656 // } 657 658 override string toString() { 659 return Helpers.collectionToString(this); 660 } 661 662 /** 663 * Atomically removes all of the elements from this queue. 664 * The queue will be empty after this call returns. 665 */ 666 override void clear() { 667 fullyLock(); 668 try { 669 for (Node!(E) p, h = head; (p = h.next) !is null; h = p) { 670 h.next = h; 671 p.item = E.init; 672 } 673 head = last; 674 // assert head.item is null && head.next is null; 675 int c = count; 676 AtomicHelper.store(count, 0); 677 if (c == capacity) 678 notFull.notify(); 679 } finally { 680 fullyUnlock(); 681 } 682 } 683 684 /** 685 * @throws UnsupportedOperationException {@inheritDoc} 686 * @throws ClassCastException {@inheritDoc} 687 * @throws NullPointerException {@inheritDoc} 688 * @throws IllegalArgumentException {@inheritDoc} 689 */ 690 int drainTo(Collection!(E) c) { 691 return drainTo(c, int.max); 692 } 693 694 /** 695 * @throws UnsupportedOperationException {@inheritDoc} 696 * @throws ClassCastException {@inheritDoc} 697 * @throws NullPointerException {@inheritDoc} 698 * @throws IllegalArgumentException {@inheritDoc} 699 */ 700 int drainTo(Collection!(E) c, int maxElements) { 701 // Objects.requireNonNull(c); 702 if (c == this) 703 throw new IllegalArgumentException(); 704 if (maxElements <= 0) 705 return 0; 706 bool canSignalNotFull = false; 707 Mutex takeLock = this.takeLock; 708 takeLock.lock(); 709 try { 710 import std.algorithm : min; 711 int n = min(maxElements, count); 712 // count.get provides visibility to first n Nodes 713 Node!(E) h = head; 714 int i = 0; 715 try { 716 while (i < n) { 717 Node!(E) p = h.next; 718 c.add(p.item); 719 p.item = E.init; 720 h.next = h; 721 h = p; 722 ++i; 723 } 724 return n; 725 } finally { 726 // Restore invariants even if c.add() threw 727 if (i > 0) { 728 // assert h.item is null; 729 head = h; 730 int ct = AtomicHelper.getAndAdd(count, i); 731 canSignalNotFull = ((ct - i) == capacity); 732 } 733 } 734 } finally { 735 takeLock.unlock(); 736 if (canSignalNotFull) 737 signalNotFull(); 738 } 739 } 740 741 /** 742 * Used for any element traversal that is not entirely under lock. 743 * Such traversals must handle both: 744 * - dequeued nodes (p.next == p) 745 * - (possibly multiple) interior removed nodes (p.item is null) 746 */ 747 Node!(E) succ(Node!(E) p) { 748 if (p == (p = p.next)) 749 p = head.next; 750 return p; 751 } 752 753 override int opApply(scope int delegate(ref E) dg) { 754 if(dg is null) 755 throw new NullPointerException(); 756 757 return forEachFrom(dg, null); 758 } 759 760 /** 761 * Runs action on each element found during a traversal starting at p. 762 * If p is null, traversal starts at head. 763 */ 764 private int forEachFrom(scope int delegate(ref E) action, Node!(E) p) { 765 // Extract batches of elements while holding the lock; then 766 // run the action on the elements while not 767 const int batchSize = 64; // max number of elements per batch 768 E[] es = null; // container for batch of elements 769 int n, len = 0; 770 int result = 0; 771 do { 772 fullyLock(); 773 try { 774 if (es is null) { 775 if (p is null) p = head.next; 776 for (Node!(E) q = p; q !is null; q = succ(q)) 777 static if(is(E == class) || is(E == string)) { 778 if (q.item !is null && ++len == batchSize) 779 break; 780 } else { 781 if (++len == batchSize) 782 break; 783 } 784 es = new E[len]; 785 } 786 for (n = 0; p !is null && n < len; p = succ(p)) { 787 es[n] = p.item; 788 static if(is(E == class) || is(E == string)) { 789 if (es[n] !is null) 790 n++; 791 } else { 792 n++; 793 } 794 } 795 } finally { 796 fullyUnlock(); 797 } 798 799 for (int i = 0; i < n; i++) { 800 E e = es[i]; 801 result = action(e); 802 if(result != 0) return result; 803 } 804 } while (n > 0 && p !is null); 805 806 return result; 807 } 808 809 /** 810 * @throws NullPointerException {@inheritDoc} 811 */ 812 override bool removeIf(Predicate!(E) filter) { 813 // Objects.requireNonNull(filter); 814 return bulkRemove(filter); 815 } 816 817 /** 818 * @throws NullPointerException {@inheritDoc} 819 */ 820 override bool removeAll(Collection!E c) { 821 // Objects.requireNonNull(c); 822 return bulkRemove(e => c.contains(e)); 823 } 824 825 /** 826 * @throws NullPointerException {@inheritDoc} 827 */ 828 override bool retainAll(Collection!E c) { 829 // Objects.requireNonNull(c); 830 return bulkRemove(e => !c.contains(e)); 831 } 832 833 /** 834 * Returns the predecessor of live node p, given a node that was 835 * once a live ancestor of p (or head); allows unlinking of p. 836 */ 837 Node!(E) findPred(Node!(E) p, Node!(E) ancestor) { 838 // assert p.item !is null; 839 static if(is(E == class) || is(E == string)) { 840 if (ancestor.item is null) 841 ancestor = head; 842 } 843 // Fails with NPE if precondition not satisfied 844 for (Node!(E) q; (q = ancestor.next) != p; ) 845 ancestor = q; 846 return ancestor; 847 } 848 849 /** Implementation of bulk remove methods. */ 850 851 private bool bulkRemove(Predicate!(E) filter) { 852 bool removed = false; 853 Node!(E) p = null, ancestor = head; 854 Node!(E)[] nodes = null; 855 int n, len = 0; 856 do { 857 // 1. Extract batch of up to 64 elements while holding the lock. 858 fullyLock(); 859 try { 860 if (nodes is null) { // first batch; initialize 861 p = head.next; 862 for (Node!(E) q = p; q !is null; q = succ(q)) { 863 static if(is(E == class) || is(E == string)) { 864 if (q.item !is null && ++len == 64) 865 break; 866 } else { 867 if (++len == 64) 868 break; 869 } 870 } 871 nodes = new Node!(E)[len]; 872 } 873 for (n = 0; p !is null && n < len; p = succ(p)) 874 nodes[n++] = p; 875 } finally { 876 fullyUnlock(); 877 } 878 879 // 2. Run the filter on the elements while lock is free. 880 long deathRow = 0L; // "bitset" of size 64 881 for (int i = 0; i < n; i++) { 882 E e = nodes[i].item; 883 static if(is(E == class) || is(E == string)) { 884 if (e !is null && filter(e)) deathRow |= 1L << i; 885 } else { 886 if (filter(e)) deathRow |= 1L << i; 887 } 888 } 889 890 // 3. Remove any filtered elements while holding the lock. 891 if (deathRow != 0) { 892 fullyLock(); 893 try { 894 for (int i = 0; i < n; i++) { 895 Node!(E) q; 896 static if(is(E == class) || is(E == string)) { 897 if ((deathRow & (1L << i)) != 0L 898 && (q = nodes[i]).item !is null) { 899 ancestor = findPred(q, ancestor); 900 unlink(q, ancestor); 901 removed = true; 902 } 903 } else { 904 if ((deathRow & (1L << i)) != 0L) { 905 q = nodes[i]; 906 ancestor = findPred(q, ancestor); 907 unlink(q, ancestor); 908 removed = true; 909 } 910 } 911 nodes[i] = null; // help GC 912 } 913 } finally { 914 fullyUnlock(); 915 } 916 } 917 } while (n > 0 && p !is null); 918 return removed; 919 } 920 921 override bool opEquals(IObject o) { 922 return opEquals(cast(Object) o); 923 } 924 925 override bool opEquals(Object o) { 926 return super.opEquals(o); 927 } 928 929 override size_t toHash() @trusted nothrow { 930 return super.toHash(); 931 } 932 933 }