1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.concurrency.AbstractQueuedSynchronizer; 13 14 import hunt.concurrency.AbstractOwnableSynchronizer; 15 import hunt.concurrency.atomic.AtomicHelper; 16 17 import hunt.collection.ArrayList; 18 import hunt.collection.Collection; 19 import hunt.util.DateTime; 20 import hunt.Exceptions; 21 22 import core.thread; 23 import core.sync.mutex; 24 import core.sync.condition; 25 26 import std.conv; 27 import std.datetime; 28 29 30 /** 31 * Provides a framework for implementing blocking locks and related 32 * synchronizers (semaphores, events, etc) that rely on 33 * first-in-first-out (FIFO) wait queues. This class is designed to 34 * be a useful basis for most kinds of synchronizers that rely on a 35 * single atomic {@code int} value to represent state. Subclasses 36 * must define the protected methods that change this state, and which 37 * define what that state means in terms of this object being acquired 38 * or released. Given these, the other methods in this class carry 39 * out all queuing and blocking mechanics. Subclasses can maintain 40 * other state fields, but only the atomically updated {@code int} 41 * value manipulated using methods {@link #getState}, {@link 42 * #setState} and {@link #compareAndSetState} is tracked with respect 43 * to synchronization. 44 * 45 * <p>Subclasses should be defined as non-internal helper 46 * classes that are used to implement the synchronization properties 47 * of their enclosing class. Class 48 * {@code AbstractQueuedSynchronizer} does not implement any 49 * synchronization interface. Instead it defines methods such as 50 * {@link #acquireInterruptibly} that can be invoked as 51 * appropriate by concrete locks and related synchronizers to 52 * implement their methods. 53 * 54 * <p>This class supports either or both a default <em>exclusive</em> 55 * mode and a <em>shared</em> mode. When acquired in exclusive mode, 56 * attempted acquires by other threads cannot succeed. Shared mode 57 * acquires by multiple threads may (but need not) succeed. This class 58 * does not "understand" these differences except in the 59 * mechanical sense that when a shared mode acquire succeeds, the next 60 * waiting thread (if one exists) must also determine whether it can 61 * acquire as well. Threads waiting in the different modes share the 62 * same FIFO queue. Usually, implementation subclasses support only 63 * one of these modes, but both can come into play for example in a 64 * {@link ReadWriteLock}. Subclasses that support only exclusive or 65 * only shared modes need not define the methods supporting the unused mode. 66 * 67 * <p>This class defines a nested {@link ConditionObject} class that 68 * can be used as a {@link Condition} implementation by subclasses 69 * supporting exclusive mode for which method {@link 70 * #isHeldExclusively} reports whether synchronization is exclusively 71 * held with respect to the current thread, method {@link #release} 72 * invoked with the current {@link #getState} value fully releases 73 * this object, and {@link #acquire}, given this saved state value, 74 * eventually restores this object to its previous acquired state. No 75 * {@code AbstractQueuedSynchronizer} method otherwise creates such a 76 * condition, so if this constraint cannot be met, do not use it. The 77 * behavior of {@link ConditionObject} depends of course on the 78 * semantics of its synchronizer implementation. 79 * 80 * <p>This class provides inspection, instrumentation, and monitoring 81 * methods for the internal queue, as well as similar methods for 82 * condition objects. These can be exported as desired into classes 83 * using an {@code AbstractQueuedSynchronizer} for their 84 * synchronization mechanics. 85 * 86 * <p>Serialization of this class stores only the underlying atomic 87 * integer maintaining state, so deserialized objects have empty 88 * thread queues. Typical subclasses requiring serializability will 89 * define a {@code readObject} method that restores this to a known 90 * initial state upon deserialization. 91 * 92 * <h3>Usage</h3> 93 * 94 * <p>To use this class as the basis of a synchronizer, redefine the 95 * following methods, as applicable, by inspecting and/or modifying 96 * the synchronization state using {@link #getState}, {@link 97 * #setState} and/or {@link #compareAndSetState}: 98 * 99 * <ul> 100 * <li>{@link #tryAcquire} 101 * <li>{@link #tryRelease} 102 * <li>{@link #tryAcquireShared} 103 * <li>{@link #tryReleaseShared} 104 * <li>{@link #isHeldExclusively} 105 * </ul> 106 * 107 * Each of these methods by default throws {@link 108 * UnsupportedOperationException}. Implementations of these methods 109 * must be internally thread-safe, and should in general be short and 110 * not block. Defining these methods is the <em>only</em> supported 111 * means of using this class. All other methods are declared 112 * {@code final} because they cannot be independently varied. 113 * 114 * <p>You may also find the inherited methods from {@link 115 * AbstractOwnableSynchronizer} useful to keep track of the thread 116 * owning an exclusive synchronizer. You are encouraged to use them 117 * -- this enables monitoring and diagnostic tools to assist users in 118 * determining which threads hold locks. 119 * 120 * <p>Even though this class is based on an internal FIFO queue, it 121 * does not automatically enforce FIFO acquisition policies. The core 122 * of exclusive synchronization takes the form: 123 * 124 * <pre> 125 * Acquire: 126 * while (!tryAcquire(arg)) { 127 * <em>enqueue thread if it is not already queued</em>; 128 * <em>possibly block current thread</em>; 129 * } 130 * 131 * Release: 132 * if (tryRelease(arg)) 133 * <em>unblock the first queued thread</em>; 134 * </pre> 135 * 136 * (Shared mode is similar but may involve cascading signals.) 137 * 138 * <p id="barging">Because checks in acquire are invoked before 139 * enqueuing, a newly acquiring thread may <em>barge</em> ahead of 140 * others that are blocked and queued. However, you can, if desired, 141 * define {@code tryAcquire} and/or {@code tryAcquireShared} to 142 * disable barging by internally invoking one or more of the inspection 143 * methods, thereby providing a <em>fair</em> FIFO acquisition order. 144 * In particular, most fair synchronizers can define {@code tryAcquire} 145 * to return {@code false} if {@link #hasQueuedPredecessors} (a method 146 * specifically designed to be used by fair synchronizers) returns 147 * {@code true}. Other variations are possible. 148 * 149 * <p>Throughput and scalability are generally highest for the 150 * default barging (also known as <em>greedy</em>, 151 * <em>renouncement</em>, and <em>convoy-avoidance</em>) strategy. 152 * While this is not guaranteed to be fair or starvation-free, earlier 153 * queued threads are allowed to recontend before later queued 154 * threads, and each recontention has an unbiased chance to succeed 155 * against incoming threads. Also, while acquires do not 156 * "spin" in the usual sense, they may perform multiple 157 * invocations of {@code tryAcquire} interspersed with other 158 * computations before blocking. This gives most of the benefits of 159 * spins when exclusive synchronization is only briefly held, without 160 * most of the liabilities when it isn't. If so desired, you can 161 * augment this by preceding calls to acquire methods with 162 * "fast-path" checks, possibly prechecking {@link #hasContended} 163 * and/or {@link #hasQueuedThreads} to only do so if the synchronizer 164 * is likely not to be contended. 165 * 166 * <p>This class provides an efficient and scalable basis for 167 * synchronization in part by specializing its range of use to 168 * synchronizers that can rely on {@code int} state, acquire, and 169 * release parameters, and an internal FIFO wait queue. When this does 170 * not suffice, you can build synchronizers from a lower level using 171 * {@link hunt.concurrency.atomic atomic} classes, your own custom 172 * {@link java.util.Queue} classes, and {@link LockSupport} blocking 173 * support. 174 * 175 * <h3>Usage Examples</h3> 176 * 177 * <p>Here is a non-reentrant mutual exclusion lock class that uses 178 * the value zero to represent the unlocked state, and one to 179 * represent the locked state. While a non-reentrant lock 180 * does not strictly require recording of the current owner 181 * thread, this class does so anyway to make usage easier to monitor. 182 * It also supports conditions and exposes some instrumentation methods: 183 * 184 * <pre> {@code 185 * class Mutex implements Lock, java.io.Serializable { 186 * 187 * // Our internal helper class 188 * private static class Sync extends AbstractQueuedSynchronizer { 189 * // Acquires the lock if state is zero 190 * bool tryAcquire(int acquires) { 191 * assert acquires == 1; // Otherwise unused 192 * if (compareAndSetState(0, 1)) { 193 * setExclusiveOwnerThread(Thread.getThis()); 194 * return true; 195 * } 196 * return false; 197 * } 198 * 199 * // Releases the lock by setting state to zero 200 * protected bool tryRelease(int releases) { 201 * assert releases == 1; // Otherwise unused 202 * if (!isHeldExclusively()) 203 * throw new IllegalMonitorStateException(); 204 * setExclusiveOwnerThread(null); 205 * setState(0); 206 * return true; 207 * } 208 * 209 * // Reports whether in locked state 210 * bool isLocked() { 211 * return getState() != 0; 212 * } 213 * 214 * bool isHeldExclusively() { 215 * // a data race, but safe due to out-of-thin-air guarantees 216 * return getExclusiveOwnerThread() == Thread.getThis(); 217 * } 218 * 219 * // Provides a Condition 220 * Condition newCondition() { 221 * return new ConditionObject(); 222 * } 223 * 224 * // Deserializes properly 225 * private void readObject(ObjectInputStream s) 226 * throws IOException, ClassNotFoundException { 227 * s.defaultReadObject(); 228 * setState(0); // reset to unlocked state 229 * } 230 * } 231 * 232 * // The sync object does all the hard work. We just forward to it. 233 * private final Sync sync = new Sync(); 234 * 235 * void lock() { sync.acquire(1); } 236 * bool tryLock() { return sync.tryAcquire(1); } 237 * void unlock() { sync.release(1); } 238 * Condition newCondition() { return sync.newCondition(); } 239 * bool isLocked() { return sync.isLocked(); } 240 * bool isHeldByCurrentThread() { 241 * return sync.isHeldExclusively(); 242 * } 243 * bool hasQueuedThreads() { 244 * return sync.hasQueuedThreads(); 245 * } 246 * void lockInterruptibly() { 247 * sync.acquireInterruptibly(1); 248 * } 249 * bool tryLock(Duration timeout) 250 * { 251 * return sync.tryAcquireNanos(1, unit.toNanos(timeout)); 252 * } 253 * }}</pre> 254 * 255 * <p>Here is a latch class that is like a 256 * {@link hunt.concurrency.CountDownLatch CountDownLatch} 257 * except that it only requires a single {@code signal} to 258 * fire. Because a latch is non-exclusive, it uses the {@code shared} 259 * acquire and release methods. 260 * 261 * <pre> {@code 262 * class BooleanLatch { 263 * 264 * private static class Sync extends AbstractQueuedSynchronizer { 265 * bool isSignalled() { return getState() != 0; } 266 * 267 * protected int tryAcquireShared(int ignore) { 268 * return isSignalled() ? 1 : -1; 269 * } 270 * 271 * protected bool tryReleaseShared(int ignore) { 272 * setState(1); 273 * return true; 274 * } 275 * } 276 * 277 * private final Sync sync = new Sync(); 278 * bool isSignalled() { return sync.isSignalled(); } 279 * void signal() { sync.releaseShared(1); } 280 * void await() { 281 * sync.acquireSharedInterruptibly(1); 282 * } 283 * }}</pre> 284 * 285 * @author Doug Lea 286 */ 287 abstract class AbstractQueuedSynchronizer : AbstractOwnableSynchronizer { 288 289 290 /** 291 * Creates a new {@code AbstractQueuedSynchronizer} instance 292 * with initial synchronization state of zero. 293 */ 294 protected this() { } 295 296 /** 297 * Head of the wait queue, lazily initialized. Except for 298 * initialization, it is modified only via method setHead. Note: 299 * If head exists, its waitStatus is guaranteed not to be 300 * CANCELLED. 301 */ 302 private Node head; 303 304 /** 305 * Tail of the wait queue, lazily initialized. Modified only via 306 * method enq to add new wait node. 307 */ 308 private Node tail; 309 310 /** 311 * The synchronization state. 312 */ 313 private int state; 314 315 /** 316 * Returns the current value of synchronization state. 317 * This operation has memory semantics of a {@code volatile} read. 318 * @return current state value 319 */ 320 protected final int getState() { 321 return state; 322 } 323 324 /** 325 * Sets the value of synchronization state. 326 * This operation has memory semantics of a {@code volatile} write. 327 * @param newState the new state value 328 */ 329 protected final void setState(int newState) { 330 state = newState; 331 } 332 333 /** 334 * Atomically sets synchronization state to the given updated 335 * value if the current state value equals the expected value. 336 * This operation has memory semantics of a {@code volatile} read 337 * and write. 338 * 339 * @param expect the expected value 340 * @param update the new value 341 * @return {@code true} if successful. False return indicates that the actual 342 * value was not equal to the expected value. 343 */ 344 protected final bool compareAndSetState(int expect, int update) { 345 return AtomicHelper.compareAndSet(state, expect, update); 346 } 347 348 // Queuing utilities 349 350 /** 351 * The number of nanoseconds for which it is faster to spin 352 * rather than to use timed park. A rough estimate suffices 353 * to improve responsiveness with very short timeouts. 354 */ 355 enum long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L; 356 357 /** 358 * Inserts node into queue, initializing if necessary. See picture above. 359 * @param node the node to insert 360 * @return node's predecessor 361 */ 362 private Node enq(Node node) { 363 for (;;) { 364 Node oldTail = tail; 365 if (oldTail !is null) { 366 node.setPrevRelaxed(oldTail); 367 if (compareAndSetTail(oldTail, node)) { 368 oldTail.next = node; 369 return oldTail; 370 } 371 } else { 372 initializeSyncQueue(); 373 } 374 } 375 } 376 377 /** 378 * Creates and enqueues node for current thread and given mode. 379 * 380 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared 381 * @return the new node 382 */ 383 private Node addWaiter(Node mode) { 384 Node node = new Node(mode); 385 386 for (;;) { 387 Node oldTail = tail; 388 if (oldTail !is null) { 389 node.setPrevRelaxed(oldTail); 390 if (compareAndSetTail(oldTail, node)) { 391 oldTail.next = node; 392 return node; 393 } 394 } else { 395 initializeSyncQueue(); 396 } 397 } 398 } 399 400 /** 401 * Sets head of queue to be node, thus dequeuing. Called only by 402 * acquire methods. Also nulls out unused fields for sake of GC 403 * and to suppress unnecessary signals and traversals. 404 * 405 * @param node the node 406 */ 407 private void setHead(Node node) { 408 head = node; 409 node.thread = null; 410 node.prev = null; 411 } 412 413 /** 414 * Wakes up node's successor, if one exists. 415 * 416 * @param node the node 417 */ 418 private void unparkSuccessor(Node node) { 419 /* 420 * If status is negative (i.e., possibly needing signal) try 421 * to clear in anticipation of signalling. It is OK if this 422 * fails or if status is changed by waiting thread. 423 */ 424 int ws = node.waitStatus; 425 if (ws < 0) 426 node.compareAndSetWaitStatus(ws, 0); 427 428 /* 429 * Thread to unpark is held in successor, which is normally 430 * just the next node. But if cancelled or apparently null, 431 * traverse backwards from tail to find the actual 432 * non-cancelled successor. 433 */ 434 Node s = node.next; 435 if (s is null || s.waitStatus > 0) { 436 s = null; 437 for (Node p = tail; p != node && p !is null; p = p.prev) 438 if (p.waitStatus <= 0) 439 s = p; 440 } 441 implementationMissing(false); 442 // if (s !is null) 443 // LockSupport.unpark(s.thread); 444 } 445 446 /** 447 * Release action for shared mode -- signals successor and ensures 448 * propagation. (Note: For exclusive mode, release just amounts 449 * to calling unparkSuccessor of head if it needs signal.) 450 */ 451 private void doReleaseShared() { 452 /* 453 * Ensure that a release propagates, even if there are other 454 * in-progress acquires/releases. This proceeds in the usual 455 * way of trying to unparkSuccessor of head if it needs 456 * signal. But if it does not, status is set to PROPAGATE to 457 * ensure that upon release, propagation continues. 458 * Additionally, we must loop in case a new node is added 459 * while we are doing this. Also, unlike other uses of 460 * unparkSuccessor, we need to know if CAS to reset status 461 * fails, if so rechecking. 462 */ 463 for (;;) { 464 Node h = head; 465 if (h !is null && h != tail) { 466 int ws = h.waitStatus; 467 if (ws == Node.SIGNAL) { 468 if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) 469 continue; // loop to recheck cases 470 unparkSuccessor(h); 471 } 472 else if (ws == 0 && 473 !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) 474 continue; // loop on failed CAS 475 } 476 if (h == head) // loop if head changed 477 break; 478 } 479 } 480 481 /** 482 * Sets head of queue, and checks if successor may be waiting 483 * in shared mode, if so propagating if either propagate > 0 or 484 * PROPAGATE status was set. 485 * 486 * @param node the node 487 * @param propagate the return value from a tryAcquireShared 488 */ 489 private void setHeadAndPropagate(Node node, int propagate) { 490 Node h = head; // Record old head for check below 491 setHead(node); 492 /* 493 * Try to signal next queued node if: 494 * Propagation was indicated by caller, 495 * or was recorded (as h.waitStatus either before 496 * or after setHead) by a previous operation 497 * (note: this uses sign-check of waitStatus because 498 * PROPAGATE status may transition to SIGNAL.) 499 * and 500 * The next node is waiting in shared mode, 501 * or we don't know, because it appears null 502 * 503 * The conservatism in both of these checks may cause 504 * unnecessary wake-ups, but only when there are multiple 505 * racing acquires/releases, so most need signals now or soon 506 * anyway. 507 */ 508 if (propagate > 0 || h is null || h.waitStatus < 0 || 509 (h = head) is null || h.waitStatus < 0) { 510 Node s = node.next; 511 if (s is null || s.isShared()) 512 doReleaseShared(); 513 } 514 } 515 516 // Utilities for various versions of acquire 517 518 /** 519 * Cancels an ongoing attempt to acquire. 520 * 521 * @param node the node 522 */ 523 private void cancelAcquire(Node node) { 524 // Ignore if node doesn't exist 525 if (node is null) 526 return; 527 528 node.thread = null; 529 530 // Skip cancelled predecessors 531 Node pred = node.prev; 532 while (pred.waitStatus > 0) 533 node.prev = pred = pred.prev; 534 535 // predNext is the apparent node to unsplice. CASes below will 536 // fail if not, in which case, we lost race vs another cancel 537 // or signal, so no further action is necessary, although with 538 // a possibility that a cancelled node may transiently remain 539 // reachable. 540 Node predNext = pred.next; 541 542 // Can use unconditional write instead of CAS here. 543 // After this atomic step, other Nodes can skip past us. 544 // Before, we are free of interference from other threads. 545 node.waitStatus = Node.CANCELLED; 546 547 // If we are the tail, remove ourselves. 548 if (node == tail && compareAndSetTail(node, pred)) { 549 pred.compareAndSetNext(predNext, null); 550 } else { 551 // If successor needs signal, try to set pred's next-link 552 // so it will get one. Otherwise wake it up to propagate. 553 int ws; 554 if (pred != head && 555 ((ws = pred.waitStatus) == Node.SIGNAL || 556 (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && 557 pred.thread !is null) { 558 Node next = node.next; 559 if (next !is null && next.waitStatus <= 0) 560 pred.compareAndSetNext(predNext, next); 561 } else { 562 unparkSuccessor(node); 563 } 564 565 node.next = node; // help GC 566 } 567 } 568 569 /** 570 * Checks and updates status for a node that failed to acquire. 571 * Returns true if thread should block. This is the main signal 572 * control in all acquire loops. Requires that pred == node.prev. 573 * 574 * @param pred node's predecessor holding status 575 * @param node the node 576 * @return {@code true} if thread should block 577 */ 578 private static bool shouldParkAfterFailedAcquire(Node pred, Node node) { 579 int ws = pred.waitStatus; 580 if (ws == Node.SIGNAL) 581 /* 582 * This node has already set status asking a release 583 * to signal it, so it can safely park. 584 */ 585 return true; 586 if (ws > 0) { 587 /* 588 * Predecessor was cancelled. Skip over predecessors and 589 * indicate retry. 590 */ 591 do { 592 node.prev = pred = pred.prev; 593 } while (pred.waitStatus > 0); 594 pred.next = node; 595 } else { 596 /* 597 * waitStatus must be 0 or PROPAGATE. Indicate that we 598 * need a signal, but don't park yet. Caller will need to 599 * retry to make sure it cannot acquire before parking. 600 */ 601 pred.compareAndSetWaitStatus(ws, Node.SIGNAL); 602 } 603 return false; 604 } 605 606 /** 607 * Convenience method to interrupt current thread. 608 */ 609 static void selfInterrupt() { 610 // Thread.getThis().interrupt(); 611 implementationMissing(false); 612 } 613 614 /** 615 * Convenience method to park and then check if interrupted. 616 * 617 * @return {@code true} if interrupted 618 */ 619 private final bool parkAndCheckInterrupt() { 620 // LockSupport.park(this); 621 // return Thread.interrupted(); 622 implementationMissing(false); 623 return true; 624 } 625 626 /* 627 * Various flavors of acquire, varying in exclusive/shared and 628 * control modes. Each is mostly the same, but annoyingly 629 * different. Only a little bit of factoring is possible due to 630 * interactions of exception mechanics (including ensuring that we 631 * cancel if tryAcquire throws exception) and other control, at 632 * least not without hurting performance too much. 633 */ 634 635 /** 636 * Acquires in exclusive uninterruptible mode for thread already in 637 * queue. Used by condition wait methods as well as acquire. 638 * 639 * @param node the node 640 * @param arg the acquire argument 641 * @return {@code true} if interrupted while waiting 642 */ 643 final bool acquireQueued(Node node, int arg) { 644 bool interrupted = false; 645 try { 646 for (;;) { 647 Node p = node.predecessor(); 648 if (p == head && tryAcquire(arg)) { 649 setHead(node); 650 p.next = null; // help GC 651 return interrupted; 652 } 653 if (shouldParkAfterFailedAcquire(p, node)) 654 interrupted |= parkAndCheckInterrupt(); 655 } 656 } catch (Throwable t) { 657 cancelAcquire(node); 658 if (interrupted) 659 selfInterrupt(); 660 throw t; 661 } 662 } 663 664 /** 665 * Acquires in exclusive interruptible mode. 666 * @param arg the acquire argument 667 */ 668 private void doAcquireInterruptibly(int arg) { 669 Node node = addWaiter(Node.EXCLUSIVE); 670 try { 671 for (;;) { 672 Node p = node.predecessor(); 673 if (p == head && tryAcquire(arg)) { 674 setHead(node); 675 p.next = null; // help GC 676 return; 677 } 678 if (shouldParkAfterFailedAcquire(p, node) && 679 parkAndCheckInterrupt()) 680 throw new InterruptedException(); 681 } 682 } catch (Throwable t) { 683 cancelAcquire(node); 684 throw t; 685 } 686 } 687 688 /** 689 * Acquires in exclusive timed mode. 690 * 691 * @param arg the acquire argument 692 * @param nanosTimeout max wait time 693 * @return {@code true} if acquired 694 */ 695 private bool doAcquireNanos(int arg, long nanosTimeout) { 696 if (nanosTimeout <= 0L) 697 return false; 698 long deadline = Clock.currStdTime() + nanosTimeout; 699 Node node = addWaiter(Node.EXCLUSIVE); 700 try { 701 for (;;) { 702 Node p = node.predecessor(); 703 if (p == head && tryAcquire(arg)) { 704 setHead(node); 705 p.next = null; // help GC 706 return true; 707 } 708 nanosTimeout = deadline - Clock.currStdTime(); 709 if (nanosTimeout <= 0L) { 710 cancelAcquire(node); 711 return false; 712 } 713 implementationMissing(false); 714 // if (shouldParkAfterFailedAcquire(p, node) && 715 // nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) 716 // LockSupport.parkNanos(this, nanosTimeout); 717 // if (Thread.interrupted()) 718 // throw new InterruptedException(); 719 } 720 } catch (Throwable t) { 721 cancelAcquire(node); 722 throw t; 723 } 724 } 725 726 /** 727 * Acquires in shared uninterruptible mode. 728 * @param arg the acquire argument 729 */ 730 private void doAcquireShared(int arg) { 731 Node node = addWaiter(Node.SHARED); 732 bool interrupted = false; 733 try { 734 for (;;) { 735 Node p = node.predecessor(); 736 if (p == head) { 737 int r = tryAcquireShared(arg); 738 if (r >= 0) { 739 setHeadAndPropagate(node, r); 740 p.next = null; // help GC 741 return; 742 } 743 } 744 if (shouldParkAfterFailedAcquire(p, node)) 745 interrupted |= parkAndCheckInterrupt(); 746 } 747 } catch (Throwable t) { 748 cancelAcquire(node); 749 throw t; 750 } finally { 751 if (interrupted) 752 selfInterrupt(); 753 } 754 } 755 756 /** 757 * Acquires in shared interruptible mode. 758 * @param arg the acquire argument 759 */ 760 private void doAcquireSharedInterruptibly(int arg) { 761 Node node = addWaiter(Node.SHARED); 762 try { 763 for (;;) { 764 Node p = node.predecessor(); 765 if (p == head) { 766 int r = tryAcquireShared(arg); 767 if (r >= 0) { 768 setHeadAndPropagate(node, r); 769 p.next = null; // help GC 770 return; 771 } 772 } 773 if (shouldParkAfterFailedAcquire(p, node) && 774 parkAndCheckInterrupt()) 775 throw new InterruptedException(); 776 } 777 } catch (Throwable t) { 778 cancelAcquire(node); 779 throw t; 780 } 781 } 782 783 /** 784 * Acquires in shared timed mode. 785 * 786 * @param arg the acquire argument 787 * @param nanosTimeout max wait time 788 * @return {@code true} if acquired 789 */ 790 private bool doAcquireSharedNanos(int arg, long nanosTimeout) { 791 if (nanosTimeout <= 0L) 792 return false; 793 long deadline = Clock.currStdTime() + nanosTimeout; 794 Node node = addWaiter(Node.SHARED); 795 try { 796 for (;;) { 797 Node p = node.predecessor(); 798 if (p == head) { 799 int r = tryAcquireShared(arg); 800 if (r >= 0) { 801 setHeadAndPropagate(node, r); 802 p.next = null; // help GC 803 return true; 804 } 805 } 806 nanosTimeout = deadline - Clock.currStdTime(); 807 if (nanosTimeout <= 0L) { 808 cancelAcquire(node); 809 return false; 810 } 811 implementationMissing(false); 812 // if (shouldParkAfterFailedAcquire(p, node) && 813 // nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) 814 // LockSupport.parkNanos(this, nanosTimeout); 815 // if (Thread.interrupted()) 816 // throw new InterruptedException(); 817 } 818 } catch (Throwable t) { 819 cancelAcquire(node); 820 throw t; 821 } 822 } 823 824 // Main exported methods 825 826 /** 827 * Attempts to acquire in exclusive mode. This method should query 828 * if the state of the object permits it to be acquired in the 829 * exclusive mode, and if so to acquire it. 830 * 831 * <p>This method is always invoked by the thread performing 832 * acquire. If this method reports failure, the acquire method 833 * may queue the thread, if it is not already queued, until it is 834 * signalled by a release from some other thread. This can be used 835 * to implement method {@link Lock#tryLock()}. 836 * 837 * <p>The default 838 * implementation throws {@link UnsupportedOperationException}. 839 * 840 * @param arg the acquire argument. This value is always the one 841 * passed to an acquire method, or is the value saved on entry 842 * to a condition wait. The value is otherwise uninterpreted 843 * and can represent anything you like. 844 * @return {@code true} if successful. Upon success, this object has 845 * been acquired. 846 * @throws IllegalMonitorStateException if acquiring would place this 847 * synchronizer in an illegal state. This exception must be 848 * thrown in a consistent fashion for synchronization to work 849 * correctly. 850 * @throws UnsupportedOperationException if exclusive mode is not supported 851 */ 852 protected bool tryAcquire(int arg) { 853 throw new UnsupportedOperationException(); 854 } 855 856 /** 857 * Attempts to set the state to reflect a release in exclusive 858 * mode. 859 * 860 * <p>This method is always invoked by the thread performing release. 861 * 862 * <p>The default implementation throws 863 * {@link UnsupportedOperationException}. 864 * 865 * @param arg the release argument. This value is always the one 866 * passed to a release method, or the current state value upon 867 * entry to a condition wait. The value is otherwise 868 * uninterpreted and can represent anything you like. 869 * @return {@code true} if this object is now in a fully released 870 * state, so that any waiting threads may attempt to acquire; 871 * and {@code false} otherwise. 872 * @throws IllegalMonitorStateException if releasing would place this 873 * synchronizer in an illegal state. This exception must be 874 * thrown in a consistent fashion for synchronization to work 875 * correctly. 876 * @throws UnsupportedOperationException if exclusive mode is not supported 877 */ 878 protected bool tryRelease(int arg) { 879 throw new UnsupportedOperationException(); 880 } 881 882 /** 883 * Attempts to acquire in shared mode. This method should query if 884 * the state of the object permits it to be acquired in the shared 885 * mode, and if so to acquire it. 886 * 887 * <p>This method is always invoked by the thread performing 888 * acquire. If this method reports failure, the acquire method 889 * may queue the thread, if it is not already queued, until it is 890 * signalled by a release from some other thread. 891 * 892 * <p>The default implementation throws {@link 893 * UnsupportedOperationException}. 894 * 895 * @param arg the acquire argument. This value is always the one 896 * passed to an acquire method, or is the value saved on entry 897 * to a condition wait. The value is otherwise uninterpreted 898 * and can represent anything you like. 899 * @return a negative value on failure; zero if acquisition in shared 900 * mode succeeded but no subsequent shared-mode acquire can 901 * succeed; and a positive value if acquisition in shared 902 * mode succeeded and subsequent shared-mode acquires might 903 * also succeed, in which case a subsequent waiting thread 904 * must check availability. (Support for three different 905 * return values enables this method to be used in contexts 906 * where acquires only sometimes act exclusively.) Upon 907 * success, this object has been acquired. 908 * @throws IllegalMonitorStateException if acquiring would place this 909 * synchronizer in an illegal state. This exception must be 910 * thrown in a consistent fashion for synchronization to work 911 * correctly. 912 * @throws UnsupportedOperationException if shared mode is not supported 913 */ 914 protected int tryAcquireShared(int arg) { 915 throw new UnsupportedOperationException(); 916 } 917 918 /** 919 * Attempts to set the state to reflect a release in shared mode. 920 * 921 * <p>This method is always invoked by the thread performing release. 922 * 923 * <p>The default implementation throws 924 * {@link UnsupportedOperationException}. 925 * 926 * @param arg the release argument. This value is always the one 927 * passed to a release method, or the current state value upon 928 * entry to a condition wait. The value is otherwise 929 * uninterpreted and can represent anything you like. 930 * @return {@code true} if this release of shared mode may permit a 931 * waiting acquire (shared or exclusive) to succeed; and 932 * {@code false} otherwise 933 * @throws IllegalMonitorStateException if releasing would place this 934 * synchronizer in an illegal state. This exception must be 935 * thrown in a consistent fashion for synchronization to work 936 * correctly. 937 * @throws UnsupportedOperationException if shared mode is not supported 938 */ 939 protected bool tryReleaseShared(int arg) { 940 throw new UnsupportedOperationException(); 941 } 942 943 /** 944 * Returns {@code true} if synchronization is held exclusively with 945 * respect to the current (calling) thread. This method is invoked 946 * upon each call to a {@link ConditionObject} method. 947 * 948 * <p>The default implementation throws {@link 949 * UnsupportedOperationException}. This method is invoked 950 * internally only within {@link ConditionObject} methods, so need 951 * not be defined if conditions are not used. 952 * 953 * @return {@code true} if synchronization is held exclusively; 954 * {@code false} otherwise 955 * @throws UnsupportedOperationException if conditions are not supported 956 */ 957 protected bool isHeldExclusively() { 958 throw new UnsupportedOperationException(); 959 } 960 961 /** 962 * Acquires in exclusive mode, ignoring interrupts. Implemented 963 * by invoking at least once {@link #tryAcquire}, 964 * returning on success. Otherwise the thread is queued, possibly 965 * repeatedly blocking and unblocking, invoking {@link 966 * #tryAcquire} until success. This method can be used 967 * to implement method {@link Lock#lock}. 968 * 969 * @param arg the acquire argument. This value is conveyed to 970 * {@link #tryAcquire} but is otherwise uninterpreted and 971 * can represent anything you like. 972 */ 973 final void acquire(int arg) { 974 if (!tryAcquire(arg) && 975 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 976 selfInterrupt(); 977 } 978 979 /** 980 * Acquires in exclusive mode, aborting if interrupted. 981 * Implemented by first checking interrupt status, then invoking 982 * at least once {@link #tryAcquire}, returning on 983 * success. Otherwise the thread is queued, possibly repeatedly 984 * blocking and unblocking, invoking {@link #tryAcquire} 985 * until success or the thread is interrupted. This method can be 986 * used to implement method {@link Lock#lockInterruptibly}. 987 * 988 * @param arg the acquire argument. This value is conveyed to 989 * {@link #tryAcquire} but is otherwise uninterpreted and 990 * can represent anything you like. 991 * @throws InterruptedException if the current thread is interrupted 992 */ 993 final void acquireInterruptibly(int arg) { 994 // if (Thread.interrupted()) 995 // throw new InterruptedException(); 996 if (!tryAcquire(arg)) 997 doAcquireInterruptibly(arg); 998 } 999 1000 /** 1001 * Attempts to acquire in exclusive mode, aborting if interrupted, 1002 * and failing if the given timeout elapses. Implemented by first 1003 * checking interrupt status, then invoking at least once {@link 1004 * #tryAcquire}, returning on success. Otherwise, the thread is 1005 * queued, possibly repeatedly blocking and unblocking, invoking 1006 * {@link #tryAcquire} until success or the thread is interrupted 1007 * or the timeout elapses. This method can be used to implement 1008 * method {@link Lock#tryLock(long, TimeUnit)}. 1009 * 1010 * @param arg the acquire argument. This value is conveyed to 1011 * {@link #tryAcquire} but is otherwise uninterpreted and 1012 * can represent anything you like. 1013 * @param nanosTimeout the maximum number of nanoseconds to wait 1014 * @return {@code true} if acquired; {@code false} if timed out 1015 * @throws InterruptedException if the current thread is interrupted 1016 */ 1017 final bool tryAcquireNanos(int arg, long nanosTimeout) { 1018 // if (Thread.interrupted()) 1019 // throw new InterruptedException(); 1020 return tryAcquire(arg) || 1021 doAcquireNanos(arg, nanosTimeout); 1022 } 1023 1024 /** 1025 * Releases in exclusive mode. Implemented by unblocking one or 1026 * more threads if {@link #tryRelease} returns true. 1027 * This method can be used to implement method {@link Lock#unlock}. 1028 * 1029 * @param arg the release argument. This value is conveyed to 1030 * {@link #tryRelease} but is otherwise uninterpreted and 1031 * can represent anything you like. 1032 * @return the value returned from {@link #tryRelease} 1033 */ 1034 final bool release(int arg) { 1035 if (tryRelease(arg)) { 1036 Node h = head; 1037 if (h !is null && h.waitStatus != 0) 1038 unparkSuccessor(h); 1039 return true; 1040 } 1041 return false; 1042 } 1043 1044 /** 1045 * Acquires in shared mode, ignoring interrupts. Implemented by 1046 * first invoking at least once {@link #tryAcquireShared}, 1047 * returning on success. Otherwise the thread is queued, possibly 1048 * repeatedly blocking and unblocking, invoking {@link 1049 * #tryAcquireShared} until success. 1050 * 1051 * @param arg the acquire argument. This value is conveyed to 1052 * {@link #tryAcquireShared} but is otherwise uninterpreted 1053 * and can represent anything you like. 1054 */ 1055 final void acquireShared(int arg) { 1056 if (tryAcquireShared(arg) < 0) 1057 doAcquireShared(arg); 1058 } 1059 1060 /** 1061 * Acquires in shared mode, aborting if interrupted. Implemented 1062 * by first checking interrupt status, then invoking at least once 1063 * {@link #tryAcquireShared}, returning on success. Otherwise the 1064 * thread is queued, possibly repeatedly blocking and unblocking, 1065 * invoking {@link #tryAcquireShared} until success or the thread 1066 * is interrupted. 1067 * @param arg the acquire argument. 1068 * This value is conveyed to {@link #tryAcquireShared} but is 1069 * otherwise uninterpreted and can represent anything 1070 * you like. 1071 * @throws InterruptedException if the current thread is interrupted 1072 */ 1073 final void acquireSharedInterruptibly(int arg) { 1074 // if (Thread.interrupted()) 1075 // throw new InterruptedException(); 1076 if (tryAcquireShared(arg) < 0) 1077 doAcquireSharedInterruptibly(arg); 1078 } 1079 1080 /** 1081 * Attempts to acquire in shared mode, aborting if interrupted, and 1082 * failing if the given timeout elapses. Implemented by first 1083 * checking interrupt status, then invoking at least once {@link 1084 * #tryAcquireShared}, returning on success. Otherwise, the 1085 * thread is queued, possibly repeatedly blocking and unblocking, 1086 * invoking {@link #tryAcquireShared} until success or the thread 1087 * is interrupted or the timeout elapses. 1088 * 1089 * @param arg the acquire argument. This value is conveyed to 1090 * {@link #tryAcquireShared} but is otherwise uninterpreted 1091 * and can represent anything you like. 1092 * @param nanosTimeout the maximum number of nanoseconds to wait 1093 * @return {@code true} if acquired; {@code false} if timed out 1094 * @throws InterruptedException if the current thread is interrupted 1095 */ 1096 final bool tryAcquireSharedNanos(int arg, long nanosTimeout) { 1097 // if (Thread.interrupted()) 1098 // throw new InterruptedException(); 1099 return tryAcquireShared(arg) >= 0 || 1100 doAcquireSharedNanos(arg, nanosTimeout); 1101 } 1102 1103 /** 1104 * Releases in shared mode. Implemented by unblocking one or more 1105 * threads if {@link #tryReleaseShared} returns true. 1106 * 1107 * @param arg the release argument. This value is conveyed to 1108 * {@link #tryReleaseShared} but is otherwise uninterpreted 1109 * and can represent anything you like. 1110 * @return the value returned from {@link #tryReleaseShared} 1111 */ 1112 final bool releaseShared(int arg) { 1113 if (tryReleaseShared(arg)) { 1114 doReleaseShared(); 1115 return true; 1116 } 1117 return false; 1118 } 1119 1120 // Queue inspection methods 1121 1122 /** 1123 * Queries whether any threads are waiting to acquire. Note that 1124 * because cancellations due to interrupts and timeouts may occur 1125 * at any time, a {@code true} return does not guarantee that any 1126 * other thread will ever acquire. 1127 * 1128 * @return {@code true} if there may be other threads waiting to acquire 1129 */ 1130 final bool hasQueuedThreads() { 1131 for (Node p = tail, h = head; p != h && p !is null; p = p.prev) 1132 if (p.waitStatus <= 0) 1133 return true; 1134 return false; 1135 } 1136 1137 /** 1138 * Queries whether any threads have ever contended to acquire this 1139 * synchronizer; that is, if an acquire method has ever blocked. 1140 * 1141 * <p>In this implementation, this operation returns in 1142 * constant time. 1143 * 1144 * @return {@code true} if there has ever been contention 1145 */ 1146 final bool hasContended() { 1147 return head !is null; 1148 } 1149 1150 /** 1151 * Returns the first (longest-waiting) thread in the queue, or 1152 * {@code null} if no threads are currently queued. 1153 * 1154 * <p>In this implementation, this operation normally returns in 1155 * constant time, but may iterate upon contention if other threads are 1156 * concurrently modifying the queue. 1157 * 1158 * @return the first (longest-waiting) thread in the queue, or 1159 * {@code null} if no threads are currently queued 1160 */ 1161 final Thread getFirstQueuedThread() { 1162 // handle only fast path, else relay 1163 return (head == tail) ? null : fullGetFirstQueuedThread(); 1164 } 1165 1166 /** 1167 * Version of getFirstQueuedThread called when fastpath fails. 1168 */ 1169 private Thread fullGetFirstQueuedThread() { 1170 /* 1171 * The first node is normally head.next. Try to get its 1172 * thread field, ensuring consistent reads: If thread 1173 * field is nulled out or s.prev is no longer head, then 1174 * some other thread(s) concurrently performed setHead in 1175 * between some of our reads. We try this twice before 1176 * resorting to traversal. 1177 */ 1178 Node h, s; 1179 Thread st; 1180 if (((h = head) !is null && (s = h.next) !is null && 1181 s.prev == head && (st = s.thread) !is null) || 1182 ((h = head) !is null && (s = h.next) !is null && 1183 s.prev == head && (st = s.thread) !is null)) 1184 return st; 1185 1186 /* 1187 * Head's next field might not have been set yet, or may have 1188 * been unset after setHead. So we must check to see if tail 1189 * is actually first node. If not, we continue on, safely 1190 * traversing from tail back to head to find first, 1191 * guaranteeing termination. 1192 */ 1193 1194 Thread firstThread = null; 1195 for (Node p = tail; p !is null && p != head; p = p.prev) { 1196 Thread t = p.thread; 1197 if (t !is null) 1198 firstThread = t; 1199 } 1200 return firstThread; 1201 } 1202 1203 /** 1204 * Returns true if the given thread is currently queued. 1205 * 1206 * <p>This implementation traverses the queue to determine 1207 * presence of the given thread. 1208 * 1209 * @param thread the thread 1210 * @return {@code true} if the given thread is on the queue 1211 * @throws NullPointerException if the thread is null 1212 */ 1213 final bool isQueued(Thread thread) { 1214 if (thread is null) 1215 throw new NullPointerException(); 1216 for (Node p = tail; p !is null; p = p.prev) 1217 if (p.thread == thread) 1218 return true; 1219 return false; 1220 } 1221 1222 /** 1223 * Returns {@code true} if the apparent first queued thread, if one 1224 * exists, is waiting in exclusive mode. If this method returns 1225 * {@code true}, and the current thread is attempting to acquire in 1226 * shared mode (that is, this method is invoked from {@link 1227 * #tryAcquireShared}) then it is guaranteed that the current thread 1228 * is not the first queued thread. Used only as a heuristic in 1229 * ReentrantReadWriteLock. 1230 */ 1231 final bool apparentlyFirstQueuedIsExclusive() { 1232 Node h, s; 1233 return (h = head) !is null && 1234 (s = h.next) !is null && 1235 !s.isShared() && 1236 s.thread !is null; 1237 } 1238 1239 /** 1240 * Queries whether any threads have been waiting to acquire longer 1241 * than the current thread. 1242 * 1243 * <p>An invocation of this method is equivalent to (but may be 1244 * more efficient than): 1245 * <pre> {@code 1246 * getFirstQueuedThread() != Thread.getThis() 1247 * && hasQueuedThreads()}</pre> 1248 * 1249 * <p>Note that because cancellations due to interrupts and 1250 * timeouts may occur at any time, a {@code true} return does not 1251 * guarantee that some other thread will acquire before the current 1252 * thread. Likewise, it is possible for another thread to win a 1253 * race to enqueue after this method has returned {@code false}, 1254 * due to the queue being empty. 1255 * 1256 * <p>This method is designed to be used by a fair synchronizer to 1257 * avoid <a href="AbstractQueuedSynchronizer.html#barging">barging</a>. 1258 * Such a synchronizer's {@link #tryAcquire} method should return 1259 * {@code false}, and its {@link #tryAcquireShared} method should 1260 * return a negative value, if this method returns {@code true} 1261 * (unless this is a reentrant acquire). For example, the {@code 1262 * tryAcquire} method for a fair, reentrant, exclusive mode 1263 * synchronizer might look like this: 1264 * 1265 * <pre> {@code 1266 * protected bool tryAcquire(int arg) { 1267 * if (isHeldExclusively()) { 1268 * // A reentrant acquire; increment hold count 1269 * return true; 1270 * } else if (hasQueuedPredecessors()) { 1271 * return false; 1272 * } else { 1273 * // try to acquire normally 1274 * } 1275 * }}</pre> 1276 * 1277 * @return {@code true} if there is a queued thread preceding the 1278 * current thread, and {@code false} if the current thread 1279 * is at the head of the queue or the queue is empty 1280 */ 1281 final bool hasQueuedPredecessors() { 1282 Node h, s; 1283 if ((h = head) !is null) { 1284 if ((s = h.next) is null || s.waitStatus > 0) { 1285 s = null; // traverse in case of concurrent cancellation 1286 for (Node p = tail; p != h && p !is null; p = p.prev) { 1287 if (p.waitStatus <= 0) 1288 s = p; 1289 } 1290 } 1291 if (s !is null && s.thread != Thread.getThis()) 1292 return true; 1293 } 1294 return false; 1295 } 1296 1297 // Instrumentation and monitoring methods 1298 1299 /** 1300 * Returns an estimate of the number of threads waiting to 1301 * acquire. The value is only an estimate because the number of 1302 * threads may change dynamically while this method traverses 1303 * internal data structures. This method is designed for use in 1304 * monitoring system state, not for synchronization control. 1305 * 1306 * @return the estimated number of threads waiting to acquire 1307 */ 1308 final int getQueueLength() { 1309 int n = 0; 1310 for (Node p = tail; p !is null; p = p.prev) { 1311 if (p.thread !is null) 1312 ++n; 1313 } 1314 return n; 1315 } 1316 1317 /** 1318 * Returns a collection containing threads that may be waiting to 1319 * acquire. Because the actual set of threads may change 1320 * dynamically while constructing this result, the returned 1321 * collection is only a best-effort estimate. The elements of the 1322 * returned collection are in no particular order. This method is 1323 * designed to facilitate construction of subclasses that provide 1324 * more extensive monitoring facilities. 1325 * 1326 * @return the collection of threads 1327 */ 1328 final Collection!(Thread) getQueuedThreads() { 1329 ArrayList!(Thread) list = new ArrayList!(Thread)(); 1330 for (Node p = tail; p !is null; p = p.prev) { 1331 Thread t = p.thread; 1332 if (t !is null) 1333 list.add(t); 1334 } 1335 return list; 1336 } 1337 1338 /** 1339 * Returns a collection containing threads that may be waiting to 1340 * acquire in exclusive mode. This has the same properties 1341 * as {@link #getQueuedThreads} except that it only returns 1342 * those threads waiting due to an exclusive acquire. 1343 * 1344 * @return the collection of threads 1345 */ 1346 final Collection!(Thread) getExclusiveQueuedThreads() { 1347 ArrayList!(Thread) list = new ArrayList!(Thread)(); 1348 for (Node p = tail; p !is null; p = p.prev) { 1349 if (!p.isShared()) { 1350 Thread t = p.thread; 1351 if (t !is null) 1352 list.add(t); 1353 } 1354 } 1355 return list; 1356 } 1357 1358 /** 1359 * Returns a collection containing threads that may be waiting to 1360 * acquire in shared mode. This has the same properties 1361 * as {@link #getQueuedThreads} except that it only returns 1362 * those threads waiting due to a shared acquire. 1363 * 1364 * @return the collection of threads 1365 */ 1366 final Collection!(Thread) getSharedQueuedThreads() { 1367 ArrayList!(Thread) list = new ArrayList!(Thread)(); 1368 for (Node p = tail; p !is null; p = p.prev) { 1369 if (p.isShared()) { 1370 Thread t = p.thread; 1371 if (t !is null) 1372 list.add(t); 1373 } 1374 } 1375 return list; 1376 } 1377 1378 /** 1379 * Returns a string identifying this synchronizer, as well as its state. 1380 * The state, in brackets, includes the string {@code "State ="} 1381 * followed by the current value of {@link #getState}, and either 1382 * {@code "nonempty"} or {@code "empty"} depending on whether the 1383 * queue is empty. 1384 * 1385 * @return a string identifying this synchronizer, as well as its state 1386 */ 1387 override string toString() { 1388 return super.toString() 1389 ~ "[State = " ~ getState().to!string() ~ ", " 1390 ~ (hasQueuedThreads() ? "non" : "") ~ "empty queue]"; 1391 } 1392 1393 1394 // Internal support methods for Conditions 1395 1396 /** 1397 * Returns true if a node, always one that was initially placed on 1398 * a condition queue, is now waiting to reacquire on sync queue. 1399 * @param node the node 1400 * @return true if is reacquiring 1401 */ 1402 final bool isOnSyncQueue(Node node) { 1403 if (node.waitStatus == Node.CONDITION || node.prev is null) 1404 return false; 1405 if (node.next !is null) // If has successor, it must be on queue 1406 return true; 1407 /* 1408 * node.prev can be non-null, but not yet on queue because 1409 * the CAS to place it on queue can fail. So we have to 1410 * traverse from tail to make sure it actually made it. It 1411 * will always be near the tail in calls to this method, and 1412 * unless the CAS failed (which is unlikely), it will be 1413 * there, so we hardly ever traverse much. 1414 */ 1415 return findNodeFromTail(node); 1416 } 1417 1418 /** 1419 * Returns true if node is on sync queue by searching backwards from tail. 1420 * Called only when needed by isOnSyncQueue. 1421 * @return true if present 1422 */ 1423 private bool findNodeFromTail(Node node) { 1424 // We check for node first, since it's likely to be at or near tail. 1425 // tail is known to be non-null, so we could re-order to "save" 1426 // one null check, but we leave it this way to help the VM. 1427 for (Node p = tail;;) { 1428 if (p == node) 1429 return true; 1430 if (p is null) 1431 return false; 1432 p = p.prev; 1433 } 1434 } 1435 1436 /** 1437 * Transfers a node from a condition queue onto sync queue. 1438 * Returns true if successful. 1439 * @param node the node 1440 * @return true if successfully transferred (else the node was 1441 * cancelled before signal) 1442 */ 1443 final bool transferForSignal(Node node) { 1444 /* 1445 * If cannot change waitStatus, the node has been cancelled. 1446 */ 1447 if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) 1448 return false; 1449 1450 /* 1451 * Splice onto queue and try to set waitStatus of predecessor to 1452 * indicate that thread is (probably) waiting. If cancelled or 1453 * attempt to set waitStatus fails, wake up to resync (in which 1454 * case the waitStatus can be transiently and harmlessly wrong). 1455 */ 1456 Node p = enq(node); 1457 int ws = p.waitStatus; 1458 implementationMissing(false); 1459 // if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) 1460 // LockSupport.unpark(node.thread); 1461 return true; 1462 } 1463 1464 /** 1465 * Transfers node, if necessary, to sync queue after a cancelled wait. 1466 * Returns true if thread was cancelled before being signalled. 1467 * 1468 * @param node the node 1469 * @return true if cancelled before the node was signalled 1470 */ 1471 final bool transferAfterCancelledWait(Node node) { 1472 if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) { 1473 enq(node); 1474 return true; 1475 } 1476 /* 1477 * If we lost out to a signal(), then we can't proceed 1478 * until it finishes its enq(). Cancelling during an 1479 * incomplete transfer is both rare and transient, so just 1480 * spin. 1481 */ 1482 while (!isOnSyncQueue(node)) 1483 Thread.yield(); 1484 return false; 1485 } 1486 1487 /** 1488 * Invokes release with current state value; returns saved state. 1489 * Cancels node and throws exception on failure. 1490 * @param node the condition node for this wait 1491 * @return previous sync state 1492 */ 1493 final int fullyRelease(Node node) { 1494 try { 1495 int savedState = getState(); 1496 if (release(savedState)) 1497 return savedState; 1498 throw new IllegalMonitorStateException(); 1499 } catch (Throwable t) { 1500 node.waitStatus = Node.CANCELLED; 1501 throw t; 1502 } 1503 } 1504 1505 // Instrumentation methods for conditions 1506 1507 /** 1508 * Queries whether the given ConditionObject 1509 * uses this synchronizer as its lock. 1510 * 1511 * @param condition the condition 1512 * @return {@code true} if owned 1513 * @throws NullPointerException if the condition is null 1514 */ 1515 // final bool owns(ConditionObject condition) { 1516 // return condition.isOwnedBy(this); 1517 // } 1518 1519 /** 1520 * Queries whether any threads are waiting on the given condition 1521 * associated with this synchronizer. Note that because timeouts 1522 * and interrupts may occur at any time, a {@code true} return 1523 * does not guarantee that a future {@code signal} will awaken 1524 * any threads. This method is designed primarily for use in 1525 * monitoring of the system state. 1526 * 1527 * @param condition the condition 1528 * @return {@code true} if there are any waiting threads 1529 * @throws IllegalMonitorStateException if exclusive synchronization 1530 * is not held 1531 * @throws IllegalArgumentException if the given condition is 1532 * not associated with this synchronizer 1533 * @throws NullPointerException if the condition is null 1534 */ 1535 // final bool hasWaiters(ConditionObject condition) { 1536 // if (!owns(condition)) 1537 // throw new IllegalArgumentException("Not owner"); 1538 // return condition.hasWaiters(); 1539 // } 1540 1541 /** 1542 * Returns an estimate of the number of threads waiting on the 1543 * given condition associated with this synchronizer. Note that 1544 * because timeouts and interrupts may occur at any time, the 1545 * estimate serves only as an upper bound on the actual number of 1546 * waiters. This method is designed for use in monitoring system 1547 * state, not for synchronization control. 1548 * 1549 * @param condition the condition 1550 * @return the estimated number of waiting threads 1551 * @throws IllegalMonitorStateException if exclusive synchronization 1552 * is not held 1553 * @throws IllegalArgumentException if the given condition is 1554 * not associated with this synchronizer 1555 * @throws NullPointerException if the condition is null 1556 */ 1557 // final int getWaitQueueLength(ConditionObject condition) { 1558 // if (!owns(condition)) 1559 // throw new IllegalArgumentException("Not owner"); 1560 // return condition.getWaitQueueLength(); 1561 // } 1562 1563 /** 1564 * Returns a collection containing those threads that may be 1565 * waiting on the given condition associated with this 1566 * synchronizer. Because the actual set of threads may change 1567 * dynamically while constructing this result, the returned 1568 * collection is only a best-effort estimate. The elements of the 1569 * returned collection are in no particular order. 1570 * 1571 * @param condition the condition 1572 * @return the collection of threads 1573 * @throws IllegalMonitorStateException if exclusive synchronization 1574 * is not held 1575 * @throws IllegalArgumentException if the given condition is 1576 * not associated with this synchronizer 1577 * @throws NullPointerException if the condition is null 1578 */ 1579 // final Collection!(Thread) getWaitingThreads(ConditionObject condition) { 1580 // if (!owns(condition)) 1581 // throw new IllegalArgumentException("Not owner"); 1582 // return condition.getWaitingThreads(); 1583 // } 1584 1585 /** 1586 * Condition implementation for a {@link AbstractQueuedSynchronizer} 1587 * serving as the basis of a {@link Lock} implementation. 1588 * 1589 * <p>Method documentation for this class describes mechanics, 1590 * not behavioral specifications from the point of view of Lock 1591 * and Condition users. Exported versions of this class will in 1592 * general need to be accompanied by documentation describing 1593 * condition semantics that rely on those of the associated 1594 * {@code AbstractQueuedSynchronizer}. 1595 * 1596 * <p>This class is Serializable, but all fields are transient, 1597 * so deserialized conditions have no waiters. 1598 */ 1599 // class ConditionObject : Condition { 1600 // /** First node of condition queue. */ 1601 // private Node firstWaiter; 1602 // /** Last node of condition queue. */ 1603 // private Node lastWaiter; 1604 1605 // /** 1606 // * Creates a new {@code ConditionObject} instance. 1607 // */ 1608 // this() { } 1609 1610 // // Internal methods 1611 1612 // /** 1613 // * Adds a new waiter to wait queue. 1614 // * @return its new wait node 1615 // */ 1616 // private Node addConditionWaiter() { 1617 // if (!isHeldExclusively()) 1618 // throw new IllegalMonitorStateException(); 1619 // Node t = lastWaiter; 1620 // // If lastWaiter is cancelled, clean out. 1621 // if (t !is null && t.waitStatus != Node.CONDITION) { 1622 // unlinkCancelledWaiters(); 1623 // t = lastWaiter; 1624 // } 1625 1626 // Node node = new Node(Node.CONDITION); 1627 1628 // if (t is null) 1629 // firstWaiter = node; 1630 // else 1631 // t.nextWaiter = node; 1632 // lastWaiter = node; 1633 // return node; 1634 // } 1635 1636 // /** 1637 // * Removes and transfers nodes until hit non-cancelled one or 1638 // * null. Split out from signal in part to encourage compilers 1639 // * to inline the case of no waiters. 1640 // * @param first (non-null) the first node on condition queue 1641 // */ 1642 // private void doSignal(Node first) { 1643 // do { 1644 // if ( (firstWaiter = first.nextWaiter) is null) 1645 // lastWaiter = null; 1646 // first.nextWaiter = null; 1647 // } while (!transferForSignal(first) && 1648 // (first = firstWaiter) !is null); 1649 // } 1650 1651 // /** 1652 // * Removes and transfers all nodes. 1653 // * @param first (non-null) the first node on condition queue 1654 // */ 1655 // private void doSignalAll(Node first) { 1656 // lastWaiter = firstWaiter = null; 1657 // do { 1658 // Node next = first.nextWaiter; 1659 // first.nextWaiter = null; 1660 // transferForSignal(first); 1661 // first = next; 1662 // } while (first !is null); 1663 // } 1664 1665 // /** 1666 // * Unlinks cancelled waiter nodes from condition queue. 1667 // * Called only while holding lock. This is called when 1668 // * cancellation occurred during condition wait, and upon 1669 // * insertion of a new waiter when lastWaiter is seen to have 1670 // * been cancelled. This method is needed to avoid garbage 1671 // * retention in the absence of signals. So even though it may 1672 // * require a full traversal, it comes into play only when 1673 // * timeouts or cancellations occur in the absence of 1674 // * signals. It traverses all nodes rather than stopping at a 1675 // * particular target to unlink all pointers to garbage nodes 1676 // * without requiring many re-traversals during cancellation 1677 // * storms. 1678 // */ 1679 // private void unlinkCancelledWaiters() { 1680 // Node t = firstWaiter; 1681 // Node trail = null; 1682 // while (t !is null) { 1683 // Node next = t.nextWaiter; 1684 // if (t.waitStatus != Node.CONDITION) { 1685 // t.nextWaiter = null; 1686 // if (trail is null) 1687 // firstWaiter = next; 1688 // else 1689 // trail.nextWaiter = next; 1690 // if (next is null) 1691 // lastWaiter = trail; 1692 // } 1693 // else 1694 // trail = t; 1695 // t = next; 1696 // } 1697 // } 1698 1699 // // methods 1700 1701 // /** 1702 // * Moves the longest-waiting thread, if one exists, from the 1703 // * wait queue for this condition to the wait queue for the 1704 // * owning lock. 1705 // * 1706 // * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1707 // * returns {@code false} 1708 // */ 1709 // final void signal() { 1710 // if (!isHeldExclusively()) 1711 // throw new IllegalMonitorStateException(); 1712 // Node first = firstWaiter; 1713 // if (first !is null) 1714 // doSignal(first); 1715 // } 1716 1717 // /** 1718 // * Moves all threads from the wait queue for this condition to 1719 // * the wait queue for the owning lock. 1720 // * 1721 // * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1722 // * returns {@code false} 1723 // */ 1724 // final void signalAll() { 1725 // if (!isHeldExclusively()) 1726 // throw new IllegalMonitorStateException(); 1727 // Node first = firstWaiter; 1728 // if (first !is null) 1729 // doSignalAll(first); 1730 // } 1731 1732 // /** 1733 // * Implements uninterruptible condition wait. 1734 // * <ol> 1735 // * <li>Save lock state returned by {@link #getState}. 1736 // * <li>Invoke {@link #release} with saved state as argument, 1737 // * throwing IllegalMonitorStateException if it fails. 1738 // * <li>Block until signalled. 1739 // * <li>Reacquire by invoking specialized version of 1740 // * {@link #acquire} with saved state as argument. 1741 // * </ol> 1742 // */ 1743 // final void awaitUninterruptibly() { 1744 // Node node = addConditionWaiter(); 1745 // int savedState = fullyRelease(node); 1746 // bool interrupted = false; 1747 // while (!isOnSyncQueue(node)) { 1748 // implementationMissing(false); 1749 // // LockSupport.park(this); 1750 // // if (Thread.interrupted()) 1751 // // interrupted = true; 1752 // } 1753 // if (acquireQueued(node, savedState) || interrupted) 1754 // selfInterrupt(); 1755 // } 1756 1757 // /* 1758 // * For interruptible waits, we need to track whether to throw 1759 // * InterruptedException, if interrupted while blocked on 1760 // * condition, versus reinterrupt current thread, if 1761 // * interrupted while blocked waiting to re-acquire. 1762 // */ 1763 1764 // /** Mode meaning to reinterrupt on exit from wait */ 1765 // private enum int REINTERRUPT = 1; 1766 // /** Mode meaning to throw InterruptedException on exit from wait */ 1767 // private enum int THROW_IE = -1; 1768 1769 // /** 1770 // * Checks for interrupt, returning THROW_IE if interrupted 1771 // * before signalled, REINTERRUPT if after signalled, or 1772 // * 0 if not interrupted. 1773 // */ 1774 // private int checkInterruptWhileWaiting(Node node) { 1775 // // return Thread.interrupted() ? 1776 // // (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 1777 // // 0; 1778 // // TODO: Tasks pending completion -@zxp at 10/14/2018, 3:41:57 PM 1779 // // 1780 // return 0; 1781 // } 1782 1783 // /** 1784 // * Throws InterruptedException, reinterrupts current thread, or 1785 // * does nothing, depending on mode. 1786 // */ 1787 // private void reportInterruptAfterWait(int interruptMode) { 1788 // if (interruptMode == THROW_IE) 1789 // throw new InterruptedException(); 1790 // else if (interruptMode == REINTERRUPT) 1791 // selfInterrupt(); 1792 // } 1793 1794 // /** 1795 // * Implements interruptible condition wait. 1796 // * <ol> 1797 // * <li>If current thread is interrupted, throw InterruptedException. 1798 // * <li>Save lock state returned by {@link #getState}. 1799 // * <li>Invoke {@link #release} with saved state as argument, 1800 // * throwing IllegalMonitorStateException if it fails. 1801 // * <li>Block until signalled or interrupted. 1802 // * <li>Reacquire by invoking specialized version of 1803 // * {@link #acquire} with saved state as argument. 1804 // * <li>If interrupted while blocked in step 4, throw InterruptedException. 1805 // * </ol> 1806 // */ 1807 // final void await() { 1808 // // if (Thread.interrupted()) 1809 // // throw new InterruptedException(); 1810 // Node node = addConditionWaiter(); 1811 // int savedState = fullyRelease(node); 1812 // int interruptMode = 0; 1813 // while (!isOnSyncQueue(node)) { 1814 // // LockSupport.park(this); 1815 // implementationMissing(false); 1816 // if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1817 // break; 1818 // } 1819 // if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1820 // interruptMode = REINTERRUPT; 1821 // if (node.nextWaiter !is null) // clean up if cancelled 1822 // unlinkCancelledWaiters(); 1823 // if (interruptMode != 0) 1824 // reportInterruptAfterWait(interruptMode); 1825 // } 1826 1827 // /** 1828 // * Implements timed condition wait. 1829 // * <ol> 1830 // * <li>If current thread is interrupted, throw InterruptedException. 1831 // * <li>Save lock state returned by {@link #getState}. 1832 // * <li>Invoke {@link #release} with saved state as argument, 1833 // * throwing IllegalMonitorStateException if it fails. 1834 // * <li>Block until signalled, interrupted, or timed out. 1835 // * <li>Reacquire by invoking specialized version of 1836 // * {@link #acquire} with saved state as argument. 1837 // * <li>If interrupted while blocked in step 4, throw InterruptedException. 1838 // * </ol> 1839 // */ 1840 // final long awaitNanos(long nanosTimeout) { 1841 // // if (Thread.interrupted()) 1842 // // throw new InterruptedException(); 1843 1844 // // We don't check for nanosTimeout <= 0L here, to allow 1845 // // awaitNanos(0) as a way to "yield the lock". 1846 // long deadline = Clock.currStdTime() + nanosTimeout; 1847 // long initialNanos = nanosTimeout; 1848 // Node node = addConditionWaiter(); 1849 // int savedState = fullyRelease(node); 1850 // int interruptMode = 0; 1851 // while (!isOnSyncQueue(node)) { 1852 // if (nanosTimeout <= 0L) { 1853 // transferAfterCancelledWait(node); 1854 // break; 1855 // } 1856 // // TODO: Tasks pending completion -@zxp at 10/14/2018, 3:39:04 PM 1857 // // 1858 // implementationMissing(false); 1859 // // if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) 1860 // // LockSupport.parkNanos(this, nanosTimeout); 1861 // if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1862 // break; 1863 // nanosTimeout = deadline - Clock.currStdTime(); 1864 // } 1865 // if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1866 // interruptMode = REINTERRUPT; 1867 // if (node.nextWaiter !is null) 1868 // unlinkCancelledWaiters(); 1869 // if (interruptMode != 0) 1870 // reportInterruptAfterWait(interruptMode); 1871 // long remaining = deadline - Clock.currStdTime(); // avoid overflow 1872 // return (remaining <= initialNanos) ? remaining : long.min; 1873 // } 1874 1875 // /** 1876 // * Implements absolute timed condition wait. 1877 // * <ol> 1878 // * <li>If current thread is interrupted, throw InterruptedException. 1879 // * <li>Save lock state returned by {@link #getState}. 1880 // * <li>Invoke {@link #release} with saved state as argument, 1881 // * throwing IllegalMonitorStateException if it fails. 1882 // * <li>Block until signalled, interrupted, or timed out. 1883 // * <li>Reacquire by invoking specialized version of 1884 // * {@link #acquire} with saved state as argument. 1885 // * <li>If interrupted while blocked in step 4, throw InterruptedException. 1886 // * <li>If timed out while blocked in step 4, return false, else true. 1887 // * </ol> 1888 // */ 1889 // final bool awaitUntil(SysTime deadline) { 1890 // long abstime = deadline.stdTime(); 1891 // // if (Thread.interrupted()) 1892 // // throw new InterruptedException(); 1893 // Node node = addConditionWaiter(); 1894 // int savedState = fullyRelease(node); 1895 // bool timedout = false; 1896 // int interruptMode = 0; 1897 // while (!isOnSyncQueue(node)) { 1898 // if (Clock.currStdTime() >= abstime) { 1899 // timedout = transferAfterCancelledWait(node); 1900 // break; 1901 // } 1902 // // LockSupport.parkUntil(this, abstime); 1903 // // TODO: Tasks pending completion -@zxp at 10/14/2018, 3:38:12 PM 1904 // // 1905 // implementationMissing(false); 1906 // if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1907 // break; 1908 // } 1909 // if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1910 // interruptMode = REINTERRUPT; 1911 // if (node.nextWaiter !is null) 1912 // unlinkCancelledWaiters(); 1913 // if (interruptMode != 0) 1914 // reportInterruptAfterWait(interruptMode); 1915 // return !timedout; 1916 // } 1917 1918 // /** 1919 // * Implements timed condition wait. 1920 // * <ol> 1921 // * <li>If current thread is interrupted, throw InterruptedException. 1922 // * <li>Save lock state returned by {@link #getState}. 1923 // * <li>Invoke {@link #release} with saved state as argument, 1924 // * throwing IllegalMonitorStateException if it fails. 1925 // * <li>Block until signalled, interrupted, or timed out. 1926 // * <li>Reacquire by invoking specialized version of 1927 // * {@link #acquire} with saved state as argument. 1928 // * <li>If interrupted while blocked in step 4, throw InterruptedException. 1929 // * <li>If timed out while blocked in step 4, return false, else true. 1930 // * </ol> 1931 // */ 1932 // final bool await(Duration time) { 1933 // long nanosTimeout = time.total!(TimeUnit.HectoNanosecond)(); 1934 // // if (Thread.interrupted()) 1935 // // throw new InterruptedException(); 1936 // // We don't check for nanosTimeout <= 0L here, to allow 1937 // // await(0, unit) as a way to "yield the lock". 1938 // long deadline = Clock.currStdTime() + nanosTimeout; 1939 // Node node = addConditionWaiter(); 1940 // int savedState = fullyRelease(node); 1941 // bool timedout = false; 1942 // int interruptMode = 0; 1943 // while (!isOnSyncQueue(node)) { 1944 // if (nanosTimeout <= 0L) { 1945 // timedout = transferAfterCancelledWait(node); 1946 // break; 1947 // } 1948 // // TODO: Tasks pending completion -@zxp at 10/14/2018, 3:37:19 PM 1949 // // 1950 // implementationMissing(false); 1951 // // if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) 1952 // // LockSupport.parkNanos(this, nanosTimeout); 1953 // if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1954 // break; 1955 // nanosTimeout = deadline - Clock.currStdTime(); 1956 // } 1957 // if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1958 // interruptMode = REINTERRUPT; 1959 // if (node.nextWaiter !is null) 1960 // unlinkCancelledWaiters(); 1961 // if (interruptMode != 0) 1962 // reportInterruptAfterWait(interruptMode); 1963 // return !timedout; 1964 // } 1965 1966 // // support for instrumentation 1967 1968 // /** 1969 // * Returns true if this condition was created by the given 1970 // * synchronization object. 1971 // * 1972 // * @return {@code true} if owned 1973 // */ 1974 // final bool isOwnedBy(AbstractQueuedSynchronizer sync) { 1975 // return sync == this.outer; // AbstractQueuedSynchronizer.this; 1976 // } 1977 1978 // /** 1979 // * Queries whether any threads are waiting on this condition. 1980 // * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}. 1981 // * 1982 // * @return {@code true} if there are any waiting threads 1983 // * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1984 // * returns {@code false} 1985 // */ 1986 // protected final bool hasWaiters() { 1987 // if (!isHeldExclusively()) 1988 // throw new IllegalMonitorStateException(); 1989 // for (Node w = firstWaiter; w !is null; w = w.nextWaiter) { 1990 // if (w.waitStatus == Node.CONDITION) 1991 // return true; 1992 // } 1993 // return false; 1994 // } 1995 1996 // /** 1997 // * Returns an estimate of the number of threads waiting on 1998 // * this condition. 1999 // * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}. 2000 // * 2001 // * @return the estimated number of waiting threads 2002 // * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 2003 // * returns {@code false} 2004 // */ 2005 // protected final int getWaitQueueLength() { 2006 // if (!isHeldExclusively()) 2007 // throw new IllegalMonitorStateException(); 2008 // int n = 0; 2009 // for (Node w = firstWaiter; w !is null; w = w.nextWaiter) { 2010 // if (w.waitStatus == Node.CONDITION) 2011 // ++n; 2012 // } 2013 // return n; 2014 // } 2015 2016 // /** 2017 // * Returns a collection containing those threads that may be 2018 // * waiting on this Condition. 2019 // * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}. 2020 // * 2021 // * @return the collection of threads 2022 // * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 2023 // * returns {@code false} 2024 // */ 2025 // protected final Collection!Thread getWaitingThreads() { 2026 // if (!isHeldExclusively()) 2027 // throw new IllegalMonitorStateException(); 2028 // ArrayList!Thread list = new ArrayList!Thread(); 2029 // for (Node w = firstWaiter; w !is null; w = w.nextWaiter) { 2030 // if (w.waitStatus == Node.CONDITION) { 2031 // Thread t = w.thread; 2032 // if (t !is null) 2033 // list.add(t); 2034 // } 2035 // } 2036 // return list; 2037 // } 2038 // } 2039 2040 /** 2041 * Initializes head and tail fields on first contention. 2042 */ 2043 private final void initializeSyncQueue() { 2044 Node h; 2045 if (AtomicHelper.compareAndSet(head, null, (h = new Node()))) 2046 tail = h; 2047 } 2048 2049 /** 2050 * CASes tail field. 2051 */ 2052 private final bool compareAndSetTail(Node expect, Node update) { 2053 return AtomicHelper.compareAndSet(tail, expect, update); 2054 } 2055 } 2056 2057 2058 2059 /** 2060 * Wait queue node class. 2061 * 2062 * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and 2063 * Hagersten) lock queue. CLH locks are normally used for 2064 * spinlocks. We instead use them for blocking synchronizers, but 2065 * use the same basic tactic of holding some of the control 2066 * information about a thread in the predecessor of its node. A 2067 * "status" field in each node keeps track of whether a thread 2068 * should block. A node is signalled when its predecessor 2069 * releases. Each node of the queue otherwise serves as a 2070 * specific-notification-style monitor holding a single waiting 2071 * thread. The status field does NOT control whether threads are 2072 * granted locks etc though. A thread may try to acquire if it is 2073 * first in the queue. But being first does not guarantee success; 2074 * it only gives the right to contend. So the currently released 2075 * contender thread may need to rewait. 2076 * 2077 * <p>To enqueue into a CLH lock, you atomically splice it in as new 2078 * tail. To dequeue, you just set the head field. 2079 * <pre> 2080 * +------+ prev +-----+ +-----+ 2081 * head | | <---- | | <---- | | tail 2082 * +------+ +-----+ +-----+ 2083 * </pre> 2084 * 2085 * <p>Insertion into a CLH queue requires only a single atomic 2086 * operation on "tail", so there is a simple atomic point of 2087 * demarcation from unqueued to queued. Similarly, dequeuing 2088 * involves only updating the "head". However, it takes a bit 2089 * more work for nodes to determine who their successors are, 2090 * in part to deal with possible cancellation due to timeouts 2091 * and interrupts. 2092 * 2093 * <p>The "prev" links (not used in original CLH locks), are mainly 2094 * needed to handle cancellation. If a node is cancelled, its 2095 * successor is (normally) relinked to a non-cancelled 2096 * predecessor. For explanation of similar mechanics in the case 2097 * of spin locks, see the papers by Scott and Scherer at 2098 * http://www.cs.rochester.edu/u/scott/synchronization/ 2099 * 2100 * <p>We also use "next" links to implement blocking mechanics. 2101 * The thread id for each node is kept in its own node, so a 2102 * predecessor signals the next node to wake up by traversing 2103 * next link to determine which thread it is. Determination of 2104 * successor must avoid races with newly queued nodes to set 2105 * the "next" fields of their predecessors. This is solved 2106 * when necessary by checking backwards from the atomically 2107 * updated "tail" when a node's successor appears to be null. 2108 * (Or, said differently, the next-links are an optimization 2109 * so that we don't usually need a backward scan.) 2110 * 2111 * <p>Cancellation introduces some conservatism to the basic 2112 * algorithms. Since we must poll for cancellation of other 2113 * nodes, we can miss noticing whether a cancelled node is 2114 * ahead or behind us. This is dealt with by always unparking 2115 * successors upon cancellation, allowing them to stabilize on 2116 * a new predecessor, unless we can identify an uncancelled 2117 * predecessor who will carry this responsibility. 2118 * 2119 * <p>CLH queues need a dummy header node to get started. But 2120 * we don't create them on construction, because it would be wasted 2121 * effort if there is never contention. Instead, the node 2122 * is constructed and head and tail pointers are set upon first 2123 * contention. 2124 * 2125 * <p>Threads waiting on Conditions use the same nodes, but 2126 * use an additional link. Conditions only need to link nodes 2127 * in simple (non-concurrent) linked queues because they are 2128 * only accessed when exclusively held. Upon await, a node is 2129 * inserted into a condition queue. Upon signal, the node is 2130 * transferred to the main queue. A special value of status 2131 * field is used to mark which queue a node is on. 2132 * 2133 * <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill 2134 * Scherer and Michael Scott, along with members of JSR-166 2135 * expert group, for helpful ideas, discussions, and critiques 2136 * on the design of this class. 2137 */ 2138 private final class Node { 2139 /** Marker to indicate a node is waiting in shared mode */ 2140 __gshared Node SHARED; // = new Node(); 2141 /** Marker to indicate a node is waiting in exclusive mode */ 2142 __gshared Node EXCLUSIVE = null; 2143 2144 /** waitStatus value to indicate thread has cancelled. */ 2145 enum int CANCELLED = 1; 2146 /** waitStatus value to indicate successor's thread needs unparking. */ 2147 enum int SIGNAL = -1; 2148 /** waitStatus value to indicate thread is waiting on condition. */ 2149 enum int CONDITION = -2; 2150 /** 2151 * waitStatus value to indicate the next acquireShared should 2152 * unconditionally propagate. 2153 */ 2154 enum int PROPAGATE = -3; 2155 2156 /** 2157 * Status field, taking on only the values: 2158 * SIGNAL: The successor of this node is (or will soon be) 2159 * blocked (via park), so the current node must 2160 * unpark its successor when it releases or 2161 * cancels. To avoid races, acquire methods must 2162 * first indicate they need a signal, 2163 * then retry the atomic acquire, and then, 2164 * on failure, block. 2165 * CANCELLED: This node is cancelled due to timeout or interrupt. 2166 * Nodes never leave this state. In particular, 2167 * a thread with cancelled node never again blocks. 2168 * CONDITION: This node is currently on a condition queue. 2169 * It will not be used as a sync queue node 2170 * until transferred, at which time the status 2171 * will be set to 0. (Use of this value here has 2172 * nothing to do with the other uses of the 2173 * field, but simplifies mechanics.) 2174 * PROPAGATE: A releaseShared should be propagated to other 2175 * nodes. This is set (for head node only) in 2176 * doReleaseShared to ensure propagation 2177 * continues, even if other operations have 2178 * since intervened. 2179 * 0: None of the above 2180 * 2181 * The values are arranged numerically to simplify use. 2182 * Non-negative values mean that a node doesn't need to 2183 * signal. So, most code doesn't need to check for particular 2184 * values, just for sign. 2185 * 2186 * The field is initialized to 0 for normal sync nodes, and 2187 * CONDITION for condition nodes. It is modified using CAS 2188 * (or when possible, unconditional writes). 2189 */ 2190 int waitStatus; 2191 2192 /** 2193 * Link to predecessor node that current node/thread relies on 2194 * for checking waitStatus. Assigned during enqueuing, and nulled 2195 * out (for sake of GC) only upon dequeuing. Also, upon 2196 * cancellation of a predecessor, we short-circuit while 2197 * finding a non-cancelled one, which will always exist 2198 * because the head node is never cancelled: A node becomes 2199 * head only as a result of successful acquire. A 2200 * cancelled thread never succeeds in acquiring, and a thread only 2201 * cancels itself, not any other node. 2202 */ 2203 Node prev; 2204 2205 /** 2206 * Link to the successor node that the current node/thread 2207 * unparks upon release. Assigned during enqueuing, adjusted 2208 * when bypassing cancelled predecessors, and nulled out (for 2209 * sake of GC) when dequeued. The enq operation does not 2210 * assign next field of a predecessor until after attachment, 2211 * so seeing a null next field does not necessarily mean that 2212 * node is at end of queue. However, if a next field appears 2213 * to be null, we can scan prev's from the tail to 2214 * double-check. The next field of cancelled nodes is set to 2215 * point to the node itself instead of null, to make life 2216 * easier for isOnSyncQueue. 2217 */ 2218 Node next; 2219 2220 /** 2221 * The thread that enqueued this node. Initialized on 2222 * construction and nulled out after use. 2223 */ 2224 Thread thread; 2225 2226 /** 2227 * Link to next node waiting on condition, or the special 2228 * value SHARED. Because condition queues are accessed only 2229 * when holding in exclusive mode, we just need a simple 2230 * linked queue to hold nodes while they are waiting on 2231 * conditions. They are then transferred to the queue to 2232 * re-acquire. And because conditions can only be exclusive, 2233 * we save a field by using special value to indicate shared 2234 * mode. 2235 */ 2236 Node nextWaiter; 2237 2238 /** 2239 * Returns true if node is waiting in shared mode. 2240 */ 2241 final bool isShared() { 2242 return nextWaiter == SHARED; 2243 } 2244 2245 /** 2246 * Returns previous node, or throws NullPointerException if null. 2247 * Use when predecessor cannot be null. The null check could 2248 * be elided, but is present to help the VM. 2249 * 2250 * @return the predecessor of this node 2251 */ 2252 Node predecessor() { 2253 Node p = prev; 2254 if (p is null) 2255 throw new NullPointerException(); 2256 else 2257 return p; 2258 } 2259 2260 shared static this() { 2261 SHARED = new Node(); 2262 } 2263 2264 /** Establishes initial head or SHARED marker. */ 2265 this() {} 2266 2267 /** Constructor used by addWaiter. */ 2268 this(Node nextWaiter) { 2269 this.nextWaiter = nextWaiter; 2270 thread = Thread.getThis(); 2271 } 2272 2273 /** Constructor used by addConditionWaiter. */ 2274 this(int waitStatus) { 2275 this.waitStatus = waitStatus; 2276 thread = Thread.getThis(); 2277 } 2278 2279 /** CASes waitStatus field. */ 2280 final bool compareAndSetWaitStatus(int expect, int update) { 2281 return AtomicHelper.compareAndSet(waitStatus, expect, update); 2282 } 2283 2284 /** CASes next field. */ 2285 final bool compareAndSetNext(Node expect, Node update) { 2286 return AtomicHelper.compareAndSet(next, expect, update); 2287 } 2288 2289 final void setPrevRelaxed(Node p) { 2290 this.prev = p; 2291 } 2292 }