1 module hunt.util.pool.ObjectPool;
2 
3 import hunt.concurrency.Future;
4 import hunt.concurrency.Promise;
5 import hunt.concurrency.FuturePromise;
6 import hunt.logging;
7 
8 import core.atomic;
9 import core.sync.mutex;
10 import core.time;
11 
12 import std.container.dlist;
13 import std.conv;
14 import std.format;
15 import std.range : walkLength;
16 
17 import hunt.util.pool.ObjectFactory;
18 import hunt.util.pool.PooledObject;
19 import hunt.util.pool.PooledObjectState;
20 
21 /**
22  * 
23  */
24 enum CreationMode {
25     Lazy,
26     Eager
27 }
28 
29 /**
30  * 
31  */
32 class PoolOptions {
33     size_t size = 5;
34     int maxWaitQueueSize = -1;
35     string name;
36     Duration waitTimeout = 15.seconds;
37     CreationMode creationMode = CreationMode.Lazy;
38 }
39 
40 
41 enum ObjectPoolState {
42     Open,
43     Closing,
44     Closed
45 }
46 
47 /**
48  * 
49  */
50 class ObjectPool(T) {
51 
52     private PoolOptions _poolOptions;
53     private shared ObjectPoolState _state = ObjectPoolState.Open;
54     private shared bool _isClearing = false;
55     private shared int _waiterNumber = 0;
56     private ObjectFactory!(T) _factory;
57     private PooledObject!(T)[] _pooledObjects;
58     private Mutex _borrowLocker;
59     private Mutex _waitersLocker;
60     private DList!(FuturePromise!T) _waiters;
61 
62     static if(is(T == class) && __traits(compiles, new T())) {
63         this(PoolOptions options) {
64             this(new DefaultObjectFactory!(T)(), options);
65         } 
66     }
67 
68     this(ObjectFactory!(T) factory, PoolOptions options) {
69         _factory = factory;
70         _poolOptions = options;
71         _pooledObjects = new PooledObject!(T)[options.size];
72         _waitersLocker = new Mutex();
73         _borrowLocker = new Mutex();
74     }
75 
76     ObjectPoolState state() {
77         return _state;
78     }
79 
80     size_t size() {
81         return _poolOptions.size;
82     }
83 
84     /**
85      * Obtains an instance from this pool.
86      * <p>
87      * By contract, clients <strong>must</strong> return the borrowed instance
88      * using {@link #returnObject}, {@link #invalidateObject}, or a related
89      * method as defined in an implementation or sub-interface.
90      * </p>
91      * <p>
92      * The behaviour of this method when the pool has been exhausted
93      * is not strictly specified (although it may be specified by
94      * implementations).
95      * </p>
96      *
97      * @return an instance from this pool.
98      */
99     T borrow() {
100        return borrow(_poolOptions.waitTimeout, true);
101     }
102 
103     T borrow(Duration timeout, bool isQuiet = true) {
104         T r;
105         if(timeout == Duration.zero) {
106             r = doBorrow();
107             if(r is null && !isQuiet) {
108                 throw new Exception("No idle object avaliable.");
109             }
110         } else {
111             Future!T future = borrowAsync();
112             if(timeout.isNegative()) {
113                 version(HUNT_POOL_DEBUG) {
114                     tracef("Borrowing with promise [%s]...", (cast(FuturePromise!T)future).id());
115                 }   
116                 r = future.get();
117             } else {
118                 version(HUNT_POOL_DEBUG) {
119                     tracef("Borrowing with promise [%s] in %s", (cast(FuturePromise!T)future).id(), timeout);
120                 }                
121                 r = future.get(timeout);
122             }
123 
124             version(HUNT_POOL_DEBUG_MORE) {
125                 tracef("Borrowed {%s} from promise [%s]", r.to!string(), (cast(FuturePromise!T)future).id());
126             }
127         }
128         
129         return r;
130     }    
131 
132 
133     /**
134      * 
135      */
136     Future!T borrowAsync() {
137         version (HUNT_POOL_DEBUG) infof("Borrowing...%s", toString());
138 
139         int number = atomicOp!("+=")(_waiterNumber, 1) - 1;
140         FuturePromise!T promise = new FuturePromise!T("PoolWaiter " ~ number.to!string());
141         size_t waitNumber = getNumWaiters();
142         version(HUNT_POOL_DEBUG_MORE) {
143             tracef("Pool: %s, new waiter [%s], current waitNumber: %d", _poolOptions.name, promise.id(), waitNumber);
144         }      
145 
146         if(_poolOptions.maxWaitQueueSize == -1 || waitNumber < _poolOptions.maxWaitQueueSize) {
147             safeInsertWaiter(promise);
148         } else {
149             string msg = format("Reach to the max WaitNumber (%d), the current: %d", 
150                 _poolOptions.maxWaitQueueSize, waitNumber);
151 
152             version(HUNT_DEBUG) {
153                 warning(msg);
154             }
155             promise.failed(new Exception(msg));
156         }  
157 
158         handleWaiters();
159         return promise;
160     }
161 
162     private void safeInsertWaiter(FuturePromise!T promise) {
163         _waitersLocker.lock();
164         scope(exit) {
165             _waitersLocker.unlock();
166         }
167 
168         _waiters.stableInsert(promise);
169     }
170 
171     private FuturePromise!T safeGetFrontWaiter() {
172         _waitersLocker.lock();
173         scope(exit) {
174             _waitersLocker.unlock();
175         }
176 
177         // FIXME: Needing refactor or cleanup -@zhangxueping at 2021-10-10T21:13:56+08:00
178         // More test
179         if(_waiters.empty) return null;
180 
181         FuturePromise!T waiter = _waiters.front();
182 
183         // Clear up all the finished waiter until a awaiting waiter found
184         while(waiter.isDone()) {
185             version(HUNT_POOL_DEBUG_MORE) 
186                 tracef("Waiter %s is done, so removed.", waiter.id());
187             _waiters.removeFront();
188             
189             if(_waiters.empty()) {
190                 version(HUNT_POOL_DEBUG) trace("No awaiting waiter found.");
191                 return null;
192             }
193 
194             waiter = _waiters.front();
195         } 
196         _waiters.removeFront(); 
197 
198         return waiter;       
199     }
200 
201     private void safeRemoveFrontWaiter() {
202 
203         _waitersLocker.lock();
204         scope(exit) {
205             _waitersLocker.unlock();
206         } 
207 
208         _waiters.removeFront();       
209     }
210 
211 
212     /**
213      * 
214      */
215     private T doBorrow() {
216     // FIXME: Needing refactor or cleanup -@zhangxueping at 2021-10-10T16:16:24+08:00        
217     // nothrow
218 
219         _borrowLocker.lock();
220         bool isUnlocked = false;
221 
222         scope(exit) {
223             version(HUNT_POOL_DEBUG_MORE) warningf("isUnlocked: %s...", isUnlocked);
224 
225             if(!isUnlocked) {
226                 _borrowLocker.unlock();
227             }
228         }
229 
230         PooledObject!(T) pooledObj;
231         T underlyingObj = null;
232         bool r = false;
233         size_t index = 0;
234 
235         for(; index<_pooledObjects.length; index++) {
236             pooledObj = _pooledObjects[index];
237 
238             version(HUNT_POOL_DEBUG_MORE) {
239                 tracef("Pool: %s, slot[%d] => %s", _poolOptions.name, index, pooledObj.to!string());
240             }
241 
242             if(pooledObj is null) {
243                 PooledObject!(T) obj = new PooledObject!(T)();
244                 _pooledObjects[index] = obj;
245                 pooledObj = obj;
246                 isUnlocked = true;
247                 _borrowLocker.unlock();
248 
249                 version(HUNT_POOL_DEBUG_MORE) {
250                     tracef("Pool: %s, binding slot[%d] => %s", _poolOptions.name, index, obj.toString());
251                 }
252 
253                 try {
254                     underlyingObj = _factory.makeObject();
255 
256                     // tracef("Pool: %s, binded slot[%d] locking...", _poolOptions.name, index);
257                     // _borrowLocker.lock();
258                     // tracef("Pool: %s, binded slot[%d] locked... underlyingObj is null: %s", _poolOptions.name, index, underlyingObj is null);
259                     
260                     // obj.bind(underlyingObj);
261                     r = pooledObj.allocate(underlyingObj);
262                     // isUnlocked = true;
263                     // _borrowLocker.unlock();
264                 } catch(Throwable t) {
265                     warning(t.msg);
266                     version(HUNT_DEBUG) warning(t);
267                     pooledObj = null;
268                     _pooledObjects[index] = null;
269                 }
270                 
271                 version(HUNT_POOL_DEBUG) {
272                     tracef("Pool: %s, binded slot[%d] => %s", _poolOptions.name, index,  obj.toString());
273                 }
274 
275                 break;
276             } else if(pooledObj.isIdle()) {
277                 underlyingObj = pooledObj.getObject();
278                 bool isValid = _factory.isValid(underlyingObj);
279                 if(isValid) {
280                     r = pooledObj.allocate();
281                     // isUnlocked = true;
282                     // _borrowLocker.unlock();
283                     break;
284                 } else {
285                     pooledObj.invalidate();
286                     version(HUNT_POOL_DEBUG) {
287                         warningf("Pool: %s. An invalid object (id=%d) detected at slot %d.", 
288                             _poolOptions.name, pooledObj.id, index);
289                     }
290                 }
291             } else if(pooledObj.isInvalid()) {
292                 underlyingObj = pooledObj.getObject();
293                 version(HUNT_POOL_DEBUG) {
294                     warningf("Pool: %s. An invalid object (id=%d) detected at slot %d.", 
295                         _poolOptions.name, pooledObj.id, index);
296                 }
297                 _pooledObjects[index] = null;
298                 // isUnlocked = true;
299                 // _borrowLocker.unlock();
300                 _factory.destroyObject(underlyingObj);
301                 break;
302             }
303 
304             pooledObj = null;
305         }
306         
307         if(pooledObj is null) {
308             version(HUNT_DEBUG) {
309                 warningf("Failed to borrow from {%s}",  toString());
310             }
311             return null;
312         }
313         
314         if(r) {
315             version(HUNT_POOL_DEBUG) {
316                 infof("Pool: %s, allocate: %s, borrowed: {%s}", _poolOptions.name, r, pooledObj.toString()); 
317             }
318             return underlyingObj;
319         } else {
320             warningf("Pool: %s, borrowing collision: slot[%d]", _poolOptions.name, index, pooledObj.toString());
321             return null;
322         }
323     }
324 
325     /**
326      * Returns an instance to the pool. By contract, <code>obj</code>
327      * <strong>must</strong> have been obtained using {@link #borrowObject()} or
328      * a related method as defined in an implementation or sub-interface.
329      *
330      * @param obj a {@link #borrowObject borrowed} instance to be returned.
331      */
332     void returnObject(T obj) {
333         if(obj !is null) {
334             doReturning(obj);
335         }
336 
337         handleWaiters();
338     } 
339 
340     private bool doReturning(T obj) {
341         // _pooledObjectsLocker.lock();
342         // scope(exit) {
343         //     _pooledObjectsLocker.unlock();
344         // }
345 
346         bool result = false;
347 
348         PooledObject!(T) pooledObj;
349         for(size_t index; index<_pooledObjects.length; index++) {
350             pooledObj = _pooledObjects[index];
351             if(pooledObj is null) {
352                 continue;
353             }
354             
355             T underlyingObj = pooledObj.getObject();
356             if(underlyingObj is obj) {
357                 version(HUNT_POOL_DEBUG_MORE) {
358                     tracef("Pool: %s, slot: %d, returning: {%s}", _poolOptions.name, index, pooledObj.toString()); 
359                 }
360                     
361                 result = pooledObj.returning();
362                 
363                 if(result) {
364                     version(HUNT_POOL_DEBUG) {
365                         tracef("Pool: %s; slot: %d, Returned: {%s}", 
366                             _poolOptions.name, index, pooledObj.toString());
367                     }
368                 } else {
369                     errorf("Pool: %s, slot: %d, Return failed: {%s}", _poolOptions.name, index, pooledObj.toString());
370                 }
371                 break;
372             }
373         }
374 
375         version(HUNT_DEBUG) {
376             info(toString());
377         }
378         // info(toString());
379         return result;
380     }
381 
382     private void handleWaiters() {
383         if(_state == ObjectPoolState.Closing || _state == ObjectPoolState.Closed) {
384             return;
385         }
386 
387         if(_state != ObjectPoolState.Open) {
388             warningf("Failed to query the waiters. The state is %s.", _state);
389             return;
390         }
391 
392         while(true) {
393             
394             if(_waiters.empty()) {
395                 version(HUNT_POOL_DEBUG) warningf("Pool: %s => No waiter avaliable.", _poolOptions.name);
396                 break;
397             }
398 
399             // 
400             T r = doBorrow();
401             if(r is null) {
402                 version(HUNT_POOL_DEBUG) warningf("Pool: %s => No idle object avaliable", _poolOptions.name);
403                 break;
404             } 
405 
406             FuturePromise!T waiter = safeGetFrontWaiter();
407             if(waiter is null) {
408                 doReturning(r);
409                 break;
410             }
411 
412             version(HUNT_POOL_DEBUG) {
413                 tracef("Borrowing for waiter [%s], isDone: %s", waiter.id(), waiter.isDone());
414             }
415 
416             //
417             try {
418                 if(waiter.succeeded(r)) {
419                     version(HUNT_POOL_DEBUG) {
420                         tracef("Borrowed for waiter [%s], result: %s", waiter.id(), (cast(Object)r).toString());
421                     }                    
422                 } else {
423                     warningf("Failed to set the result for promise [%s] with %s", 
424                         waiter.id(), (cast(Object)r).toString());
425 
426                     doReturning(r);
427                 }
428             } catch(Throwable ex) {
429                 warning(ex);
430                 doReturning(r);
431             }
432         }
433     }
434 
435     /**
436      * Returns the number of instances currently idle in this pool. This may be
437      * considered an approximation of the number of objects that can be
438      * {@link #borrowObject borrowed} without creating any new instances.
439      * Returns a negative value if this information is not available.
440      * @return the number of instances currently idle in this pool.
441      */
442     size_t getNumIdle() {
443         size_t count = 0;
444 
445         foreach(PooledObject!(T) obj; _pooledObjects) {
446             if(obj !is null && obj.isIdle()) {
447                 count++;
448             } 
449         }
450 
451         return count;
452     }
453 
454     size_t getNumFree() {
455 
456         size_t count = 0;
457 
458         foreach(PooledObject!(T) obj; _pooledObjects) {
459             if(obj is null || obj.isUnusable() || obj.isInvalid()) {
460                 count++;
461             } 
462         }
463 
464         return count;        
465     }
466 
467     /**
468      * Returns the number of instances currently borrowed from this pool. Returns
469      * a negative value if this information is not available.
470      * @return the number of instances currently borrowed from this pool.
471      */
472     size_t getNumActive() {
473         size_t count = 0;
474 
475         foreach(PooledObject!(T) obj; _pooledObjects) {
476             if(obj !is null && obj.isInUse()) {
477                 count++;
478             } 
479         }
480 
481         return count;        
482     }
483 
484     /**
485      * Returns an estimate of the number of threads currently blocked waiting for
486      * an object from the pool. This is intended for monitoring only, not for
487      * synchronization control.
488      *
489      * @return The estimate of the number of threads currently blocked waiting
490      *         for an object from the pool
491      */
492     size_t getNumWaiters() {
493         _waitersLocker.lock();
494         scope(exit) {
495             _waitersLocker.unlock();
496         } 
497         return walkLength(_waiters[]);
498     }
499 
500     /**
501      * Clears any objects sitting idle in the pool, releasing any associated
502      * resources (optional operation). Idle objects cleared must be
503      * {@link PooledObjectFactory#destroyObject(PooledObject)}.
504      *
505      * @throws Exception if the pool cannot be cleared
506      */
507     void clear() {
508         version(HUNT_POOL_DEBUG) {
509             infof("Pool [%s] is clearing...", _poolOptions.name);
510         }
511 
512         bool r = cas(&_isClearing, false, true);
513         if(!r) {
514             return;
515         }
516 
517         _borrowLocker.lock();
518         scope(exit) {
519             _isClearing = false;
520             version(HUNT_POOL_DEBUG) infof("Pool [%s] is cleared...", _poolOptions.name);
521             _borrowLocker.unlock();        
522         }
523 
524         for(size_t index; index<_pooledObjects.length; index++) {
525             PooledObject!(T) obj = _pooledObjects[index];
526 
527             if(obj !is null) {
528                 version(HUNT_POOL_DEBUG) {
529                     tracef("clearing object: id=%d, slot=%d", obj.id, index);
530                 }
531 
532                 _pooledObjects[index] = null;
533                 obj.abandoned();
534 
535                 // TODO: It's better to run it asynchronously
536                 _factory.destroyObject(obj.getObject());
537             }
538         }
539     }
540 
541     /**
542      * Closes this pool, and free any resources associated with it.
543      * <p>
544      * Calling {@link #borrowObject} after invoking this
545      * method on a pool will cause them to throw an {@link IllegalStateException}.
546      * </p>
547      * <p>
548      * Implementations should silently fail if not all resources can be freed.
549      * </p>
550      */
551     void close() {
552         version(HUNT_DEBUG) {
553             // infof("Closing pool %s (state=%s)...", _poolOptions.name, _state);
554             tracef(toString());
555         }
556 
557         bool r = cas(&_state, ObjectPoolState.Open, ObjectPoolState.Closing);
558         if(!r) {
559             return;
560         }
561 
562         scope(exit) {
563             _state = ObjectPoolState.Closed;
564             version(HUNT_POOL_DEBUG) {
565                 infof("Pool %s closed...", _poolOptions.name);
566             }
567         }
568 
569         for(size_t index; index<_pooledObjects.length; index++) {
570             PooledObject!(T) obj = _pooledObjects[index];
571 
572             if(obj !is null) {
573                 version(HUNT_POOL_DEBUG) {
574                     tracef("Pool: %s, destroying object: id=%d, slot=%d, state: %s", 
575                         _poolOptions.name,  obj.id, index, obj.state());
576 
577                     if(obj.state() == PooledObjectState.ALLOCATED) {
578                         warningf("binded obj: %s", (cast(Object)obj.getObject()).toString());
579                     }
580                 }
581 
582                 _pooledObjects[index] = null;
583                 obj.abandoned();
584 
585                 // TODO: It's better to run it asynchronously
586                 _factory.destroyObject(obj.getObject());
587             }
588         }
589 
590     }
591 
592     override string toString() {
593         string str = format("Name: %s, Total: %d, Active: %d, Idle: %d, Free: %d, Waiters: %d", 
594                 _poolOptions.name, size(), getNumActive(), getNumIdle(), getNumFree(), getNumWaiters());
595         return str;
596     }
597 }