1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.concurrency.thread.LockSupport; 13 14 import core.atomic; 15 import core.thread; 16 import core.time; 17 18 import hunt.concurrency.thread.ThreadEx; 19 import hunt.Exceptions; 20 import hunt.logging.ConsoleLogger; 21 import hunt.util.DateTime; 22 23 24 /** 25 * Basic thread blocking primitives for creating locks and other 26 * synchronization classes. 27 * 28 * <p>This class associates, with each thread that uses it, a permit 29 * (in the sense of the {@link java.util.concurrent.Semaphore 30 * Semaphore} class). A call to {@code park} will return immediately 31 * if the permit is available, consuming it in the process; otherwise 32 * it <em>may</em> block. A call to {@code unpark} makes the permit 33 * available, if it was not already available. (Unlike with Semaphores 34 * though, permits do not accumulate. There is at most one.) 35 * Reliable usage requires the use of volatile (or atomic) variables 36 * to control when to park or unpark. Orderings of calls to these 37 * methods are maintained with respect to volatile variable accesses, 38 * but not necessarily non-volatile variable accesses. 39 * 40 * <p>Methods {@code park} and {@code unpark} provide efficient 41 * means of blocking and unblocking threads that do not encounter the 42 * problems that cause the deprecated methods {@code Thread.suspend} 43 * and {@code Thread.resume} to be unusable for such purposes: Races 44 * between one thread invoking {@code park} and another thread trying 45 * to {@code unpark} it will preserve liveness, due to the 46 * permit. Additionally, {@code park} will return if the caller's 47 * thread was interrupted, and timeout versions are supported. The 48 * {@code park} method may also return at any other time, for "no 49 * reason", so in general must be invoked within a loop that rechecks 50 * conditions upon return. In this sense {@code park} serves as an 51 * optimization of a "busy wait" that does not waste as much time 52 * spinning, but must be paired with an {@code unpark} to be 53 * effective. 54 * 55 * <p>The three forms of {@code park} each also support a 56 * {@code blocker} object parameter. This object is recorded while 57 * the thread is blocked to permit monitoring and diagnostic tools to 58 * identify the reasons that threads are blocked. (Such tools may 59 * access blockers using method {@link #getBlocker(Thread)}.) 60 * The use of these forms rather than the original forms without this 61 * parameter is strongly encouraged. The normal argument to supply as 62 * a {@code blocker} within a lock implementation is {@code this}. 63 * 64 * <p>These methods are designed to be used as tools for creating 65 * higher-level synchronization utilities, and are not in themselves 66 * useful for most concurrency control applications. The {@code park} 67 * method is designed for use only in constructions of the form: 68 * 69 * <pre> {@code 70 * while (!canProceed()) { 71 * // ensure request to unpark is visible to other threads 72 * ... 73 * LockSupport.park(this); 74 * }}</pre> 75 * 76 * where no actions by the thread publishing a request to unpark, 77 * prior to the call to {@code park}, entail locking or blocking. 78 * Because only one permit is associated with each thread, any 79 * intermediary uses of {@code park}, including implicitly via class 80 * loading, could lead to an unresponsive thread (a "lost unpark"). 81 * 82 * <p><b>Sample Usage.</b> Here is a sketch of a first-in-first-out 83 * non-reentrant lock class: 84 * <pre> {@code 85 * class FIFOMutex { 86 * private final AtomicBoolean locked = new AtomicBoolean(false); 87 * private final Queue!(Thread) waiters 88 * = new ConcurrentLinkedQueue<>(); 89 * 90 * void lock() { 91 * boolean wasInterrupted = false; 92 * // publish current thread for unparkers 93 * waiters.add(Thread.currentThread()); 94 * 95 * // Block while not first in queue or cannot acquire lock 96 * while (waiters.peek() != Thread.currentThread() || 97 * !locked.compareAndSet(false, true)) { 98 * LockSupport.park(this); 99 * // ignore interrupts while waiting 100 * if (Thread.interrupted()) 101 * wasInterrupted = true; 102 * } 103 * 104 * waiters.remove(); 105 * // ensure correct interrupt status on return 106 * if (wasInterrupted) 107 * Thread.currentThread().interrupt(); 108 * } 109 * 110 * void unlock() { 111 * locked.set(false); 112 * LockSupport.unpark(waiters.peek()); 113 * } 114 * 115 * static { 116 * // Reduce the risk of "lost unpark" due to classloading 117 * Class<?> ensureLoaded = LockSupport.class; 118 * } 119 * }}</pre> 120 * 121 */ 122 class LockSupport { 123 private static Parker _parker; 124 private __gshared Parker[Thread] parkers; 125 private shared static bool m_lock; 126 127 private this() {} // Cannot be instantiated. 128 129 static Parker getParker() { 130 if(_parker is null) { 131 Thread t = Thread.getThis(); 132 ThreadEx tx = cast(ThreadEx)t; 133 if(tx !is null) { 134 return tx.parker(); 135 } else { 136 _parker = createParker(t); 137 } 138 } 139 return _parker; 140 } 141 142 static Parker getParker(Thread t) { 143 if(t is Thread.getThis()) 144 return getParker(); 145 146 ThreadEx tx = cast(ThreadEx)t; 147 if(tx !is null) { 148 return tx.parker(); 149 } else { 150 Parker* itemPtr = t in parkers; 151 if(itemPtr is null) { 152 _parker = createParker(t); 153 } 154 155 return *itemPtr; 156 } 157 } 158 159 private static Parker createParker(Thread t) { 160 version(HUNT_DEBUG) info("creating a new parker for " ~ typeid(t).name); 161 Parker p = Parker.allocate(t); 162 163 while(!cas(&m_lock, false, true)) { 164 // waitting... 165 } 166 167 parkers[t] = p; 168 m_lock = false; 169 170 return p; 171 } 172 173 static void removeParker() { 174 removeParker(Thread.getThis); 175 _parker = null; 176 } 177 178 static void removeParker(Thread t) { 179 while(!cas(&m_lock, false, true)) { 180 } 181 parkers.remove(t); 182 m_lock = false; 183 } 184 185 /** 186 * Makes available the permit for the given thread, if it 187 * was not already available. If the thread was blocked on 188 * {@code park} then it will unblock. Otherwise, its next call 189 * to {@code park} is guaranteed not to block. This operation 190 * is not guaranteed to have any effect at all if the given 191 * thread has not been started. 192 * 193 * @param thread the thread to unpark, or {@code null}, in which case 194 * this operation has no effect 195 */ 196 static void unpark(Thread thread) { 197 getParker(thread).unpark(); 198 } 199 200 static void unpark() { 201 getParker().unpark(); 202 } 203 204 205 /** 206 * Disables the current thread for thread scheduling purposes unless the 207 * permit is available. 208 * 209 * <p>If the permit is available then it is consumed and the call 210 * returns immediately; otherwise the current thread becomes disabled 211 * for thread scheduling purposes and lies dormant until one of three 212 * things happens: 213 * 214 * <ul> 215 * 216 * <li>Some other thread invokes {@link #unpark unpark} with the 217 * current thread as the target; or 218 * 219 * <li>Some other thread {@linkplain Thread#interrupt interrupts} 220 * the current thread; or 221 * 222 * <li>The call spuriously (that is, for no reason) returns. 223 * </ul> 224 * 225 * <p>This method does <em>not</em> report which of these caused the 226 * method to return. Callers should re-check the conditions which caused 227 * the thread to park in the first place. Callers may also determine, 228 * for example, the interrupt status of the thread upon return. 229 */ 230 static void park() { 231 getParker().park(Duration.zero); 232 } 233 234 static void park(Duration time) { 235 getParker().park(time); 236 } 237 238 /** 239 * Disables the current thread for thread scheduling purposes unless the 240 * permit is available. 241 * 242 * <p>If the permit is available then it is consumed and the call returns 243 * immediately; otherwise 244 * the current thread becomes disabled for thread scheduling 245 * purposes and lies dormant until one of three things happens: 246 * 247 * <ul> 248 * <li>Some other thread invokes {@link #unpark unpark} with the 249 * current thread as the target; or 250 * 251 * <li>Some other thread {@linkplain Thread#interrupt interrupts} 252 * the current thread; or 253 * 254 * <li>The call spuriously (that is, for no reason) returns. 255 * </ul> 256 * 257 * <p>This method does <em>not</em> report which of these caused the 258 * method to return. Callers should re-check the conditions which caused 259 * the thread to park in the first place. Callers may also determine, 260 * for example, the interrupt status of the thread upon return. 261 * 262 * @param blocker the synchronization object responsible for this 263 * thread parking 264 */ 265 static void park(Object blocker) { 266 park(blocker, Duration.zero); 267 } 268 269 static void park(Object blocker, Duration time) { 270 if (time >= Duration.zero) { 271 Parker p = getParker(); 272 p.setBlocker(blocker); 273 p.park(time); 274 p.setBlocker(null); 275 } else { 276 warning("The time must be greater than 0."); 277 } 278 } 279 280 /** 281 * Disables the current thread for thread scheduling purposes, for up to 282 * the specified waiting time, unless the permit is available. 283 * 284 * <p>If the permit is available then it is consumed and the call 285 * returns immediately; otherwise the current thread becomes disabled 286 * for thread scheduling purposes and lies dormant until one of four 287 * things happens: 288 * 289 * <ul> 290 * <li>Some other thread invokes {@link #unpark unpark} with the 291 * current thread as the target; or 292 * 293 * <li>Some other thread {@linkplain Thread#interrupt interrupts} 294 * the current thread; or 295 * 296 * <li>The specified waiting time elapses; or 297 * 298 * <li>The call spuriously (that is, for no reason) returns. 299 * </ul> 300 * 301 * <p>This method does <em>not</em> report which of these caused the 302 * method to return. Callers should re-check the conditions which caused 303 * the thread to park in the first place. Callers may also determine, 304 * for example, the interrupt status of the thread, or the elapsed time 305 * upon return. 306 * 307 * @param blocker the synchronization object responsible for this 308 * thread parking 309 * @param nanos the maximum number of nanoseconds to wait 310 */ 311 312 deprecated("Using park(Object, Duration) instead.") 313 static void parkNanos(Object blocker, long nanos) { 314 if(nanos > 0) { 315 park(blocker, dur!(TimeUnit.Nanosecond)(nanos)); 316 } 317 } 318 319 /** 320 * Disables the current thread for thread scheduling purposes, for up to 321 * the specified waiting time, unless the permit is available. 322 * 323 * <p>If the permit is available then it is consumed and the call 324 * returns immediately; otherwise the current thread becomes disabled 325 * for thread scheduling purposes and lies dormant until one of four 326 * things happens: 327 * 328 * <ul> 329 * <li>Some other thread invokes {@link #unpark unpark} with the 330 * current thread as the target; or 331 * 332 * <li>Some other thread {@linkplain Thread#interrupt interrupts} 333 * the current thread; or 334 * 335 * <li>The specified waiting time elapses; or 336 * 337 * <li>The call spuriously (that is, for no reason) returns. 338 * </ul> 339 * 340 * <p>This method does <em>not</em> report which of these caused the 341 * method to return. Callers should re-check the conditions which caused 342 * the thread to park in the first place. Callers may also determine, 343 * for example, the interrupt status of the thread, or the elapsed time 344 * upon return. 345 * 346 * @param nanos the maximum number of nanoseconds to wait 347 */ 348 deprecated("Using park(Duration) instead.") 349 static void parkNanos(long nanos) { 350 if (nanos > 0) { 351 getParker().park(nanos.nsecs); 352 } 353 } 354 355 /** 356 * Disables the current thread for thread scheduling purposes, until 357 * the specified deadline, unless the permit is available. 358 * 359 * <p>If the permit is available then it is consumed and the call 360 * returns immediately; otherwise the current thread becomes disabled 361 * for thread scheduling purposes and lies dormant until one of four 362 * things happens: 363 * 364 * <ul> 365 * <li>Some other thread invokes {@link #unpark unpark} with the 366 * current thread as the target; or 367 * 368 * <li>Some other thread {@linkplain Thread#interrupt interrupts} the 369 * current thread; or 370 * 371 * <li>The specified deadline passes; or 372 * 373 * <li>The call spuriously (that is, for no reason) returns. 374 * </ul> 375 * 376 * <p>This method does <em>not</em> report which of these caused the 377 * method to return. Callers should re-check the conditions which caused 378 * the thread to park in the first place. Callers may also determine, 379 * for example, the interrupt status of the thread, or the current time 380 * upon return. 381 * 382 * @param blocker the synchronization object responsible for this 383 * thread parking 384 * @param deadline the absolute time, in milliseconds from the Epoch, 385 * to wait until 386 */ 387 static void parkUntil(Object blocker, MonoTime deadline) { 388 Parker p = getParker(); 389 p.setBlocker(blocker); 390 p.park(deadline); 391 p.setBlocker(null); 392 } 393 394 // deprecated("Using parkUntil(Object, Duration) instead.") 395 // static void parkUntil(Object blocker, long deadline) { 396 // parkUntil(blocker, deadline.msecs); 397 // } 398 399 /** 400 * Disables the current thread for thread scheduling purposes, until 401 * the specified deadline, unless the permit is available. 402 * 403 * <p>If the permit is available then it is consumed and the call 404 * returns immediately; otherwise the current thread becomes disabled 405 * for thread scheduling purposes and lies dormant until one of four 406 * things happens: 407 * 408 * <ul> 409 * <li>Some other thread invokes {@link #unpark unpark} with the 410 * current thread as the target; or 411 * 412 * <li>Some other thread {@linkplain Thread#interrupt interrupts} 413 * the current thread; or 414 * 415 * <li>The specified deadline passes; or 416 * 417 * <li>The call spuriously (that is, for no reason) returns. 418 * </ul> 419 * 420 * <p>This method does <em>not</em> report which of these caused the 421 * method to return. Callers should re-check the conditions which caused 422 * the thread to park in the first place. Callers may also determine, 423 * for example, the interrupt status of the thread, or the current time 424 * upon return. 425 * 426 * @param deadline the absolute time, in milliseconds from the Epoch, 427 * to wait until 428 */ 429 static void parkUntil(MonoTime deadline) { 430 getParker().park(deadline); 431 } 432 433 deprecated("Using parkUntil(Duration) instead.") 434 static void parkUntil(long deadline) { 435 parkUntil(MonoTime(deadline)); 436 } 437 438 439 /** 440 * Returns the blocker object supplied to the most recent 441 * invocation of a park method that has not yet unblocked, or null 442 * if not blocked. The value returned is just a momentary 443 * snapshot -- the thread may have since unblocked or blocked on a 444 * different blocker object. 445 * 446 * @param t the thread 447 * @return the blocker 448 * @throws NullPointerException if argument is null 449 */ 450 static Object getBlocker(Thread t) { 451 return getParker(t).getBlocker(); 452 } 453 454 /** 455 * Returns the pseudo-randomly initialized or updated secondary seed. 456 * Copied from ThreadLocalRandom due to package access restrictions. 457 */ 458 // static final int nextSecondarySeed() { 459 // int r; 460 // Thread t = Thread.currentThread(); 461 // if ((r = U.getInt(t, SECONDARY)) != 0) { 462 // r ^= r << 13; // xorshift 463 // r ^= r >>> 17; 464 // r ^= r << 5; 465 // } 466 // else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0) 467 // r = 1; // avoid zero 468 // U.putInt(t, SECONDARY, r); 469 // return r; 470 // } 471 472 /** 473 * Returns the thread id for the given thread. We must access 474 * this directly rather than via method Thread.getId() because 475 * getId() has been known to be overridden in ways that do not 476 * preserve unique mappings. 477 */ 478 // static final long getThreadId(Thread thread) { 479 // return U.getLong(thread, TID); 480 // } 481 482 }