1 module hunt.util.pool.ObjectPool;
2 
3 import hunt.concurrency.Future;
4 import hunt.concurrency.Promise;
5 import hunt.concurrency.FuturePromise;
6 import hunt.logging.ConsoleLogger;
7 
8 import core.sync.mutex;
9 
10 import std.container.dlist;
11 import core.time;
12 import std.format;
13 import std.range : walkLength;
14 
15 import hunt.util.pool.ObjectFactory;
16 import hunt.util.pool.PooledObject;
17 
18 /**
19  * 
20  */
21 enum CreationMode {
22     Lazy,
23     Eager
24 }
25 
26 /**
27  * 
28  */
29 class PoolOptions {
30     size_t size = 5;
31     CreationMode creationMode = CreationMode.Lazy;
32 }
33 
34 
35 /**
36  * 
37  */
38 class ObjectPool(T) {
39     private ObjectFactory!(T) _factory;
40     private PooledObject!(T)[] _pooledObjects;
41     private Mutex _locker;
42     private DList!(FuturePromise!T) _waiters;
43     private PoolOptions _poolOptions;
44 
45     this(PoolOptions options) {
46         this(new DefaultObjectFactory!(T)(), options);
47     }
48 
49     this(ObjectFactory!(T) factory, PoolOptions options) {
50         _factory = factory;
51         _poolOptions = options;
52         _pooledObjects = new PooledObject!(T)[options.size];
53         _locker = new Mutex();
54     }
55 
56     size_t size() {
57         return _poolOptions.size;
58     }
59 
60     /**
61      * Obtains an instance from this pool.
62      * <p>
63      * By contract, clients <strong>must</strong> return the borrowed instance
64      * using {@link #returnObject}, {@link #invalidateObject}, or a related
65      * method as defined in an implementation or sub-interface.
66      * </p>
67      * <p>
68      * The behaviour of this method when the pool has been exhausted
69      * is not strictly specified (although it may be specified by
70      * implementations).
71      * </p>
72      *
73      * @return an instance from this pool.
74      */
75     T borrow(Duration timeout = 10.seconds, bool isQuiet = true) {
76         T r;
77         if(timeout == Duration.zero) {
78             _locker.lock();
79             scope(exit) {
80                 _locker.unlock();
81             }
82 
83             r = doBorrow();
84             if(r is null && !isQuiet) {
85                 throw new Exception("No idle object avaliable.");
86             }
87         } else {
88             Future!T future = borrowAsync();
89             if(timeout.isNegative()) {
90                 r = future.get();
91             } else {
92                 r = future.get(timeout);
93             }
94         }
95         return r;
96     }    
97 
98 
99     /**
100      * 
101      */
102     Future!T borrowAsync() {
103         _locker.lock();
104         scope(exit) {
105             _locker.unlock();
106         }
107         
108         FuturePromise!T promise = new FuturePromise!T();
109 
110         if(_waiters.empty()) {
111             T r = doBorrow();
112             if(r is null) {
113                 _waiters.stableInsert(promise);
114                 version(HUNT_DEBUG) {
115                     warningf("New waiter...%d", getNumWaiters());
116                 }
117             } else {
118                 promise.succeeded(r);
119             }
120         } else {
121             _waiters.stableInsert(promise);
122             version(HUNT_DEBUG) {
123                 warningf("New waiter...%d", getNumWaiters());
124             }
125         }
126 
127         return promise;
128     }
129 
130     /**
131      * 
132      */
133     private T doBorrow() {
134         PooledObject!(T) pooledObj;
135 
136         for(size_t index; index<_pooledObjects.length; index++) {
137             pooledObj = _pooledObjects[index];
138 
139             if(pooledObj is null) {
140                 T underlyingObj = _factory.makeObject();
141                 pooledObj = new PooledObject!(T)(underlyingObj);
142                 _pooledObjects[index] = pooledObj;
143                 break;
144             } else if(pooledObj.isIdle()) {
145                 T underlyingObj = pooledObj.getObject();
146                 bool isValid = _factory.isValid(underlyingObj);
147                 if(!isValid) {
148                     pooledObj.invalidate();
149                     version(HUNT_DEBUG) {
150                         warningf("An invalid object (id=%d) detected at slot %d.", pooledObj.id, index);
151                     }
152                     _factory.destroyObject(underlyingObj);
153                     underlyingObj = _factory.makeObject();
154                     pooledObj = new PooledObject!(T)(underlyingObj);
155                     _pooledObjects[index] = pooledObj;
156                 }
157                 break;
158             } else if(pooledObj.isInvalid()) {
159                 T underlyingObj = pooledObj.getObject();
160                 version(HUNT_DEBUG) {
161                     warningf("An invalid object (id=%d) detected at slot %d.", pooledObj.id, index);
162                 }
163                 _factory.destroyObject(underlyingObj);
164                 underlyingObj = _factory.makeObject();
165                 pooledObj = new PooledObject!(T)(underlyingObj);
166                 _pooledObjects[index] = pooledObj;
167                 break;
168             }
169 
170             pooledObj = null;
171         }
172         
173         if(pooledObj is null) {
174             version(HUNT_DEBUG) {
175                 warning("No idle object avaliable.");
176             }
177             return null;
178         }
179         
180         pooledObj.allocate();
181 
182         version(HUNT_DEBUG) {
183             infof("borrowed: id=%d, createTime=%s; pool status = { %s }", 
184                 pooledObj.id, pooledObj.createTime(), toString()); 
185         }
186         return pooledObj.getObject();        
187     }
188 
189     /**
190      * Returns an instance to the pool. By contract, <code>obj</code>
191      * <strong>must</strong> have been obtained using {@link #borrowObject()} or
192      * a related method as defined in an implementation or sub-interface.
193      *
194      * @param obj a {@link #borrowObject borrowed} instance to be returned.
195      */
196     void returnObject(T obj) {
197         if(obj is null) {
198             version(HUNT_DEBUG) warning("Do nothing for a null object");
199             return;
200         }
201 
202         scope(exit) {
203             _locker.lock();
204             scope(exit) {
205                 _locker.unlock();
206             }
207             handleWaiters();
208         }
209 
210         doReturning(obj);
211     } 
212 
213     private bool doReturning(T obj) {
214         bool result = false;
215 
216         PooledObject!(T) pooledObj;
217         for(size_t index; index<_pooledObjects.length; index++) {
218             pooledObj = _pooledObjects[index];
219             if(pooledObj is null) {
220                 continue;
221             }
222             
223             T underlyingObj = pooledObj.getObject();
224             if(underlyingObj is obj) {
225                 version(HUNT_DEBUG_MORE) {
226                     tracef("returning: id=%d, state=%s, count=%s, createTime=%s", 
227                         pooledObj.id, pooledObj.state(), pooledObj.borrowedCount(), pooledObj.createTime()); 
228                 }
229                     
230                 // pooledObj.returning();
231                 result = pooledObj.deallocate();
232                 version(HUNT_DEBUG) {
233                     if(result) {
234                         infof("Returned: id=%d", pooledObj.id);
235                     } else {
236                         warningf("Return failed: id=%d", pooledObj.id);
237                     }
238                 }
239                 break;
240             }
241         }
242 
243         version(HUNT_DEBUG) {
244             info(toString());
245         }
246         return result;
247     }
248 
249     private void handleWaiters() {
250         if(_waiters.empty())
251             return;
252         
253         FuturePromise!T waiter = _waiters.front();
254 
255         // clear up all the finished waiter
256         while(waiter.isDone()) {
257             _waiters.removeFront();
258             if(_waiters.empty()) {
259                 return;
260             }
261 
262             waiter = _waiters.front();
263         }
264 
265         // 
266         T r = doBorrow();
267         if(r is null) {
268             warning("No idle object avaliable for waiter");
269         } else {
270             _waiters.removeFront();
271             try {
272                 waiter.succeeded(r);
273             } catch(Exception ex) {
274                 warning(ex);
275             }
276         }
277     }
278 
279     /**
280      * Returns the number of instances currently idle in this pool. This may be
281      * considered an approximation of the number of objects that can be
282      * {@link #borrowObject borrowed} without creating any new instances.
283      * Returns a negative value if this information is not available.
284      * @return the number of instances currently idle in this pool.
285      */
286     size_t getNumIdle() {
287         size_t count = 0;
288 
289         foreach(PooledObject!(T) obj; _pooledObjects) {
290             if(obj is null || obj.isIdle()) {
291                 count++;
292             } 
293         }
294 
295         return count;
296     }
297 
298     /**
299      * Returns the number of instances currently borrowed from this pool. Returns
300      * a negative value if this information is not available.
301      * @return the number of instances currently borrowed from this pool.
302      */
303     size_t getNumActive() {
304         size_t count = 0;
305 
306         foreach(PooledObject!(T) obj; _pooledObjects) {
307             if(obj !is null && obj.isInUse()) {
308                 count++;
309             } 
310         }
311 
312         return count;        
313     }
314 
315     /**
316      * Returns an estimate of the number of threads currently blocked waiting for
317      * an object from the pool. This is intended for monitoring only, not for
318      * synchronization control.
319      *
320      * @return The estimate of the number of threads currently blocked waiting
321      *         for an object from the pool
322      */
323     size_t getNumWaiters() {
324         return walkLength(_waiters[]);
325     }
326 
327     /**
328      * Clears any objects sitting idle in the pool, releasing any associated
329      * resources (optional operation). Idle objects cleared must be
330      * {@link PooledObjectFactory#destroyObject(PooledObject)}.
331      *
332      * @throws Exception if the pool cannot be cleared
333      */
334     void clear() {
335         version(HUNT_DEBUG) {
336             info("Pool is clearing...");
337         }
338 
339         _locker.lock();
340         scope(exit) {
341             _locker.unlock();
342         }
343 
344         for(size_t index; index<_pooledObjects.length; index++) {
345             PooledObject!(T) obj = _pooledObjects[index];
346 
347             if(obj !is null) {
348                 version(HUNT_DEBUG) {
349                     tracef("clearing object: id=%d, slot=%d", obj.id, index);
350                 }
351 
352                 _pooledObjects[index] = null;
353                 obj.abandoned();
354                 _factory.destroyObject(obj.getObject());
355             }
356         }
357     }
358 
359     /**
360      * Closes this pool, and free any resources associated with it.
361      * <p>
362      * Calling {@link #borrowObject} after invoking this
363      * method on a pool will cause them to throw an {@link IllegalStateException}.
364      * </p>
365      * <p>
366      * Implementations should silently fail if not all resources can be freed.
367      * </p>
368      */
369     void close() {
370         version(HUNT_DEBUG) {
371             info("Pool is closing...");
372         }
373 
374         _locker.lock();
375         scope(exit) {
376             _locker.unlock();
377         }
378 
379         for(size_t index; index<_pooledObjects.length; index++) {
380             PooledObject!(T) obj = _pooledObjects[index];
381 
382             if(obj !is null) {
383                 version(HUNT_DEBUG) {
384                     tracef("destroying object: id=%d, slot=%d", obj.id, index);
385                 }
386 
387                 _pooledObjects[index] = null;
388                 obj.abandoned();
389                 _factory.destroyObject(obj.getObject());
390             }
391         }
392 
393     }
394 
395     override string toString() {
396         string str = format("Total: %d, Active: %d, Idle: %d, Waiters: %d", 
397                 size(), getNumActive(),  getNumIdle(), getNumWaiters());
398         return str;
399     }
400 }