1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.concurrency.thread.LockSupport;
13 
14 import core.atomic;
15 import core.thread;
16 import core.time;
17 
18 import hunt.concurrency.thread.ThreadEx;
19 import hunt.Exceptions;
20 import hunt.logging.ConsoleLogger;
21 import hunt.util.DateTime;
22 
23 
24 /**
25  * Basic thread blocking primitives for creating locks and other
26  * synchronization classes.
27  *
28  * <p>This class associates, with each thread that uses it, a permit
29  * (in the sense of the {@link java.util.concurrent.Semaphore
30  * Semaphore} class). A call to {@code park} will return immediately
31  * if the permit is available, consuming it in the process; otherwise
32  * it <em>may</em> block.  A call to {@code unpark} makes the permit
33  * available, if it was not already available. (Unlike with Semaphores
34  * though, permits do not accumulate. There is at most one.)
35  * Reliable usage requires the use of volatile (or atomic) variables
36  * to control when to park or unpark.  Orderings of calls to these
37  * methods are maintained with respect to volatile variable accesses,
38  * but not necessarily non-volatile variable accesses.
39  *
40  * <p>Methods {@code park} and {@code unpark} provide efficient
41  * means of blocking and unblocking threads that do not encounter the
42  * problems that cause the deprecated methods {@code Thread.suspend}
43  * and {@code Thread.resume} to be unusable for such purposes: Races
44  * between one thread invoking {@code park} and another thread trying
45  * to {@code unpark} it will preserve liveness, due to the
46  * permit. Additionally, {@code park} will return if the caller's
47  * thread was interrupted, and timeout versions are supported. The
48  * {@code park} method may also return at any other time, for "no
49  * reason", so in general must be invoked within a loop that rechecks
50  * conditions upon return. In this sense {@code park} serves as an
51  * optimization of a "busy wait" that does not waste as much time
52  * spinning, but must be paired with an {@code unpark} to be
53  * effective.
54  *
55  * <p>The three forms of {@code park} each also support a
56  * {@code blocker} object parameter. This object is recorded while
57  * the thread is blocked to permit monitoring and diagnostic tools to
58  * identify the reasons that threads are blocked. (Such tools may
59  * access blockers using method {@link #getBlocker(Thread)}.)
60  * The use of these forms rather than the original forms without this
61  * parameter is strongly encouraged. The normal argument to supply as
62  * a {@code blocker} within a lock implementation is {@code this}.
63  *
64  * <p>These methods are designed to be used as tools for creating
65  * higher-level synchronization utilities, and are not in themselves
66  * useful for most concurrency control applications.  The {@code park}
67  * method is designed for use only in constructions of the form:
68  *
69  * <pre> {@code
70  * while (!canProceed()) {
71  *   // ensure request to unpark is visible to other threads
72  *   ...
73  *   LockSupport.park(this);
74  * }}</pre>
75  *
76  * where no actions by the thread publishing a request to unpark,
77  * prior to the call to {@code park}, entail locking or blocking.
78  * Because only one permit is associated with each thread, any
79  * intermediary uses of {@code park}, including implicitly via class
80  * loading, could lead to an unresponsive thread (a "lost unpark").
81  *
82  * <p><b>Sample Usage.</b> Here is a sketch of a first-in-first-out
83  * non-reentrant lock class:
84  * <pre> {@code
85  * class FIFOMutex {
86  *   private final AtomicBoolean locked = new AtomicBoolean(false);
87  *   private final Queue!(Thread) waiters
88  *     = new ConcurrentLinkedQueue<>();
89  *
90  *   void lock() {
91  *     boolean wasInterrupted = false;
92  *     // publish current thread for unparkers
93  *     waiters.add(Thread.currentThread());
94  *
95  *     // Block while not first in queue or cannot acquire lock
96  *     while (waiters.peek() != Thread.currentThread() ||
97  *            !locked.compareAndSet(false, true)) {
98  *       LockSupport.park(this);
99  *       // ignore interrupts while waiting
100  *       if (Thread.interrupted())
101  *         wasInterrupted = true;
102  *     }
103  *
104  *     waiters.remove();
105  *     // ensure correct interrupt status on return
106  *     if (wasInterrupted)
107  *       Thread.currentThread().interrupt();
108  *   }
109  *
110  *   void unlock() {
111  *     locked.set(false);
112  *     LockSupport.unpark(waiters.peek());
113  *   }
114  *
115  *   static {
116  *     // Reduce the risk of "lost unpark" due to classloading
117  *     Class<?> ensureLoaded = LockSupport.class;
118  *   }
119  * }}</pre>
120  *
121  */
122 class LockSupport {
123     private static Parker _parker;    
124     private __gshared Parker[Thread] parkers;
125     private shared static bool m_lock;
126 
127     private this() {} // Cannot be instantiated.
128 
129     static Parker getParker() {
130         if(_parker is null) {
131             Thread t = Thread.getThis();
132             ThreadEx tx = cast(ThreadEx)t;
133             if(tx !is null) {
134                 return tx.parker();
135             } else {
136                 _parker = createParker(t);
137             }
138         }
139         return _parker;
140     }
141 
142     static Parker getParker(Thread t) {
143         if(t is Thread.getThis())
144             return getParker();
145         
146         ThreadEx tx = cast(ThreadEx)t;
147         if(tx !is null) {
148             return tx.parker();
149         } else {
150             Parker* itemPtr = t in parkers;
151             if(itemPtr is null) {
152                 _parker = createParker(t);
153             }
154 
155             return *itemPtr;
156         }
157     }
158 
159     private static Parker createParker(Thread t) {
160         version(HUNT_DEBUG) info("creating a new parker for " ~ typeid(t).name);
161         Parker p = Parker.allocate(t);
162 
163         while(!cas(&m_lock, false, true)) {
164             // waitting...
165         }
166 
167         parkers[t] = p;
168         m_lock = false;
169 
170         return p;
171     }
172 
173     static void removeParker() {
174         removeParker(Thread.getThis);
175         _parker = null;
176     }
177 
178     static void removeParker(Thread t) {
179         while(!cas(&m_lock, false, true)) {
180         }
181         parkers.remove(t);
182         m_lock = false;
183     }
184 
185     /**
186      * Makes available the permit for the given thread, if it
187      * was not already available.  If the thread was blocked on
188      * {@code park} then it will unblock.  Otherwise, its next call
189      * to {@code park} is guaranteed not to block. This operation
190      * is not guaranteed to have any effect at all if the given
191      * thread has not been started.
192      *
193      * @param thread the thread to unpark, or {@code null}, in which case
194      *        this operation has no effect
195      */
196     static void unpark(Thread thread) {
197         getParker(thread).unpark();
198     }
199 
200     static void unpark() {
201         getParker().unpark();
202     }
203 
204 
205     /**
206      * Disables the current thread for thread scheduling purposes unless the
207      * permit is available.
208      *
209      * <p>If the permit is available then it is consumed and the call
210      * returns immediately; otherwise the current thread becomes disabled
211      * for thread scheduling purposes and lies dormant until one of three
212      * things happens:
213      *
214      * <ul>
215      *
216      * <li>Some other thread invokes {@link #unpark unpark} with the
217      * current thread as the target; or
218      *
219      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
220      * the current thread; or
221      *
222      * <li>The call spuriously (that is, for no reason) returns.
223      * </ul>
224      *
225      * <p>This method does <em>not</em> report which of these caused the
226      * method to return. Callers should re-check the conditions which caused
227      * the thread to park in the first place. Callers may also determine,
228      * for example, the interrupt status of the thread upon return.
229      */
230     static void park() {
231         getParker().park(Duration.zero);
232     }
233 
234     static void park(Duration time) {
235         getParker().park(time);
236     }
237 
238     /**
239      * Disables the current thread for thread scheduling purposes unless the
240      * permit is available.
241      *
242      * <p>If the permit is available then it is consumed and the call returns
243      * immediately; otherwise
244      * the current thread becomes disabled for thread scheduling
245      * purposes and lies dormant until one of three things happens:
246      *
247      * <ul>
248      * <li>Some other thread invokes {@link #unpark unpark} with the
249      * current thread as the target; or
250      *
251      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
252      * the current thread; or
253      *
254      * <li>The call spuriously (that is, for no reason) returns.
255      * </ul>
256      *
257      * <p>This method does <em>not</em> report which of these caused the
258      * method to return. Callers should re-check the conditions which caused
259      * the thread to park in the first place. Callers may also determine,
260      * for example, the interrupt status of the thread upon return.
261      *
262      * @param blocker the synchronization object responsible for this
263      *        thread parking
264      */
265     static void park(Object blocker) {
266         park(blocker, Duration.zero);
267     }
268 
269     static void park(Object blocker, Duration time) {
270         if (time >= Duration.zero) {
271             Parker p = getParker();
272             p.setBlocker(blocker);
273             p.park(time);
274             p.setBlocker(null);
275         } else {
276             warning("The time must be greater than 0.");
277         }
278     }
279 
280     /**
281      * Disables the current thread for thread scheduling purposes, for up to
282      * the specified waiting time, unless the permit is available.
283      *
284      * <p>If the permit is available then it is consumed and the call
285      * returns immediately; otherwise the current thread becomes disabled
286      * for thread scheduling purposes and lies dormant until one of four
287      * things happens:
288      *
289      * <ul>
290      * <li>Some other thread invokes {@link #unpark unpark} with the
291      * current thread as the target; or
292      *
293      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
294      * the current thread; or
295      *
296      * <li>The specified waiting time elapses; or
297      *
298      * <li>The call spuriously (that is, for no reason) returns.
299      * </ul>
300      *
301      * <p>This method does <em>not</em> report which of these caused the
302      * method to return. Callers should re-check the conditions which caused
303      * the thread to park in the first place. Callers may also determine,
304      * for example, the interrupt status of the thread, or the elapsed time
305      * upon return.
306      *
307      * @param blocker the synchronization object responsible for this
308      *        thread parking
309      * @param nanos the maximum number of nanoseconds to wait
310      */
311     
312     deprecated("Using park(Object, Duration) instead.")
313     static void parkNanos(Object blocker, long nanos) {
314         if(nanos > 0) {
315             park(blocker, dur!(TimeUnit.Nanosecond)(nanos));
316         }
317     }
318 
319     /**
320      * Disables the current thread for thread scheduling purposes, for up to
321      * the specified waiting time, unless the permit is available.
322      *
323      * <p>If the permit is available then it is consumed and the call
324      * returns immediately; otherwise the current thread becomes disabled
325      * for thread scheduling purposes and lies dormant until one of four
326      * things happens:
327      *
328      * <ul>
329      * <li>Some other thread invokes {@link #unpark unpark} with the
330      * current thread as the target; or
331      *
332      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
333      * the current thread; or
334      *
335      * <li>The specified waiting time elapses; or
336      *
337      * <li>The call spuriously (that is, for no reason) returns.
338      * </ul>
339      *
340      * <p>This method does <em>not</em> report which of these caused the
341      * method to return. Callers should re-check the conditions which caused
342      * the thread to park in the first place. Callers may also determine,
343      * for example, the interrupt status of the thread, or the elapsed time
344      * upon return.
345      *
346      * @param nanos the maximum number of nanoseconds to wait
347      */
348     deprecated("Using park(Duration) instead.")
349     static void parkNanos(long nanos) {        
350         if (nanos > 0) {
351             getParker().park(nanos.nsecs);
352         }
353     }
354     
355     /**
356      * Disables the current thread for thread scheduling purposes, until
357      * the specified deadline, unless the permit is available.
358      *
359      * <p>If the permit is available then it is consumed and the call
360      * returns immediately; otherwise the current thread becomes disabled
361      * for thread scheduling purposes and lies dormant until one of four
362      * things happens:
363      *
364      * <ul>
365      * <li>Some other thread invokes {@link #unpark unpark} with the
366      * current thread as the target; or
367      *
368      * <li>Some other thread {@linkplain Thread#interrupt interrupts} the
369      * current thread; or
370      *
371      * <li>The specified deadline passes; or
372      *
373      * <li>The call spuriously (that is, for no reason) returns.
374      * </ul>
375      *
376      * <p>This method does <em>not</em> report which of these caused the
377      * method to return. Callers should re-check the conditions which caused
378      * the thread to park in the first place. Callers may also determine,
379      * for example, the interrupt status of the thread, or the current time
380      * upon return.
381      *
382      * @param blocker the synchronization object responsible for this
383      *        thread parking
384      * @param deadline the absolute time, in milliseconds from the Epoch,
385      *        to wait until
386      */
387     static void parkUntil(Object blocker, MonoTime deadline) {
388         Parker p = getParker();
389         p.setBlocker(blocker);
390         p.park(deadline);
391         p.setBlocker(null);
392     }
393     
394     // deprecated("Using parkUntil(Object, Duration) instead.")
395     // static void parkUntil(Object blocker, long deadline) {
396     //     parkUntil(blocker, deadline.msecs);
397     // }
398 
399     /**
400      * Disables the current thread for thread scheduling purposes, until
401      * the specified deadline, unless the permit is available.
402      *
403      * <p>If the permit is available then it is consumed and the call
404      * returns immediately; otherwise the current thread becomes disabled
405      * for thread scheduling purposes and lies dormant until one of four
406      * things happens:
407      *
408      * <ul>
409      * <li>Some other thread invokes {@link #unpark unpark} with the
410      * current thread as the target; or
411      *
412      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
413      * the current thread; or
414      *
415      * <li>The specified deadline passes; or
416      *
417      * <li>The call spuriously (that is, for no reason) returns.
418      * </ul>
419      *
420      * <p>This method does <em>not</em> report which of these caused the
421      * method to return. Callers should re-check the conditions which caused
422      * the thread to park in the first place. Callers may also determine,
423      * for example, the interrupt status of the thread, or the current time
424      * upon return.
425      *
426      * @param deadline the absolute time, in milliseconds from the Epoch,
427      *        to wait until
428      */
429     static void parkUntil(MonoTime deadline) {
430         getParker().park(deadline);
431     }
432 
433     deprecated("Using parkUntil(Duration) instead.")
434     static void parkUntil(long deadline) {
435         parkUntil(MonoTime(deadline));
436     }
437 
438 
439     /**
440      * Returns the blocker object supplied to the most recent
441      * invocation of a park method that has not yet unblocked, or null
442      * if not blocked.  The value returned is just a momentary
443      * snapshot -- the thread may have since unblocked or blocked on a
444      * different blocker object.
445      *
446      * @param t the thread
447      * @return the blocker
448      * @throws NullPointerException if argument is null
449      */
450     static Object getBlocker(Thread t) {
451         return getParker(t).getBlocker();
452     }
453 
454     /**
455      * Returns the pseudo-randomly initialized or updated secondary seed.
456      * Copied from ThreadLocalRandom due to package access restrictions.
457      */
458     // static final int nextSecondarySeed() {
459     //     int r;
460     //     Thread t = Thread.currentThread();
461     //     if ((r = U.getInt(t, SECONDARY)) != 0) {
462     //         r ^= r << 13;   // xorshift
463     //         r ^= r >>> 17;
464     //         r ^= r << 5;
465     //     }
466     //     else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0)
467     //         r = 1; // avoid zero
468     //     U.putInt(t, SECONDARY, r);
469     //     return r;
470     // }
471 
472     /**
473      * Returns the thread id for the given thread.  We must access
474      * this directly rather than via method Thread.getId() because
475      * getId() has been known to be overridden in ways that do not
476      * preserve unique mappings.
477      */
478     // static final long getThreadId(Thread thread) {
479     //     return U.getLong(thread, TID);
480     // }
481 
482 }