1 module hunt.util.pool.ObjectPool;
2
3 import hunt.concurrency.Future;
4 import hunt.concurrency.Promise;
5 import hunt.concurrency.FuturePromise;
6 import hunt.logging;
7
8 import core.atomic;
9 import core.sync.mutex;
10 import core.time;
11
12 import std.container.dlist;
13 import std.conv;
14 import std.format;
15 import std.range : walkLength;
16
17 import hunt.util.pool.ObjectFactory;
18 import hunt.util.pool.PooledObject;
19 import hunt.util.pool.PooledObjectState;
20
21 /**
22 *
23 */
24 enum CreationMode {
25 Lazy,
26 Eager
27 }
28
29 /**
30 *
31 */
32 class PoolOptions {
33 size_t size = 5;
34 int maxWaitQueueSize = -1;
35 string name;
36 Duration waitTimeout = 15.seconds;
37 CreationMode creationMode = CreationMode.Lazy;
38 }
39
40
41 enum ObjectPoolState {
42 Open,
43 Closing,
44 Closed
45 }
46
47 /**
48 *
49 */
50 class ObjectPool(T) {
51
52 private PoolOptions _poolOptions;
53 private shared ObjectPoolState _state = ObjectPoolState.Open;
54 private shared bool _isClearing = false;
55 private shared int _waiterNumber = 0;
56 private ObjectFactory!(T) _factory;
57 private PooledObject!(T)[] _pooledObjects;
58 private Mutex _borrowLocker;
59 private Mutex _waitersLocker;
60 private DList!(FuturePromise!T) _waiters;
61
62 static if(is(T == class) && __traits(compiles, new T())) {
63 this(PoolOptions options) {
64 this(new DefaultObjectFactory!(T)(), options);
65 }
66 }
67
68 this(ObjectFactory!(T) factory, PoolOptions options) {
69 _factory = factory;
70 _poolOptions = options;
71 _pooledObjects = new PooledObject!(T)[options.size];
72 _waitersLocker = new Mutex();
73 _borrowLocker = new Mutex();
74 }
75
76 ObjectPoolState state() {
77 return _state;
78 }
79
80 size_t size() {
81 return _poolOptions.size;
82 }
83
84 /**
85 * Obtains an instance from this pool.
86 * <p>
87 * By contract, clients <strong>must</strong> return the borrowed instance
88 * using {@link #returnObject}, {@link #invalidateObject}, or a related
89 * method as defined in an implementation or sub-interface.
90 * </p>
91 * <p>
92 * The behaviour of this method when the pool has been exhausted
93 * is not strictly specified (although it may be specified by
94 * implementations).
95 * </p>
96 *
97 * @return an instance from this pool.
98 */
99 T borrow() {
100 return borrow(_poolOptions.waitTimeout, true);
101 }
102
103 T borrow(Duration timeout, bool isQuiet = true) {
104 T r;
105 if(timeout == Duration.zero) {
106 r = doBorrow();
107 if(r is null && !isQuiet) {
108 throw new Exception("No idle object avaliable.");
109 }
110 } else {
111 Future!T future = borrowAsync();
112 if(timeout.isNegative()) {
113 version(HUNT_POOL_DEBUG) {
114 tracef("Borrowing with promise [%s]...", (cast(FuturePromise!T)future).id());
115 }
116 r = future.get();
117 } else {
118 version(HUNT_POOL_DEBUG) {
119 tracef("Borrowing with promise [%s] in %s", (cast(FuturePromise!T)future).id(), timeout);
120 }
121 r = future.get(timeout);
122 }
123
124 version(HUNT_POOL_DEBUG_MORE) {
125 tracef("Borrowed {%s} from promise [%s]", r.to!string(), (cast(FuturePromise!T)future).id());
126 }
127 }
128
129 return r;
130 }
131
132
133 /**
134 *
135 */
136 Future!T borrowAsync() {
137 version (HUNT_POOL_DEBUG) infof("Borrowing...%s", toString());
138
139 int number = atomicOp!("+=")(_waiterNumber, 1) - 1;
140 FuturePromise!T promise = new FuturePromise!T("PoolWaiter " ~ number.to!string());
141 size_t waitNumber = getNumWaiters();
142 version(HUNT_POOL_DEBUG_MORE) {
143 tracef("Pool: %s, new waiter [%s], current waitNumber: %d", _poolOptions.name, promise.id(), waitNumber);
144 }
145
146 if(_poolOptions.maxWaitQueueSize == -1 || waitNumber < _poolOptions.maxWaitQueueSize) {
147 safeInsertWaiter(promise);
148 } else {
149 string msg = format("Reach to the max WaitNumber (%d), the current: %d",
150 _poolOptions.maxWaitQueueSize, waitNumber);
151
152 version(HUNT_DEBUG) {
153 warning(msg);
154 }
155 promise.failed(new Exception(msg));
156 }
157
158 handleWaiters();
159 return promise;
160 }
161
162 private void safeInsertWaiter(FuturePromise!T promise) {
163 _waitersLocker.lock();
164 scope(exit) {
165 _waitersLocker.unlock();
166 }
167
168 _waiters.stableInsert(promise);
169 }
170
171 private FuturePromise!T safeGetFrontWaiter() {
172 _waitersLocker.lock();
173 scope(exit) {
174 _waitersLocker.unlock();
175 }
176
177 // FIXME: Needing refactor or cleanup -@zhangxueping at 2021-10-10T21:13:56+08:00
178 // More test
179 if(_waiters.empty) return null;
180
181 FuturePromise!T waiter = _waiters.front();
182
183 // Clear up all the finished waiter until a awaiting waiter found
184 while(waiter.isDone()) {
185 version(HUNT_POOL_DEBUG_MORE)
186 tracef("Waiter %s is done, so removed.", waiter.id());
187 _waiters.removeFront();
188
189 if(_waiters.empty()) {
190 version(HUNT_POOL_DEBUG) trace("No awaiting waiter found.");
191 return null;
192 }
193
194 waiter = _waiters.front();
195 }
196 _waiters.removeFront();
197
198 return waiter;
199 }
200
201 private void safeRemoveFrontWaiter() {
202
203 _waitersLocker.lock();
204 scope(exit) {
205 _waitersLocker.unlock();
206 }
207
208 _waiters.removeFront();
209 }
210
211
212 /**
213 *
214 */
215 private T doBorrow() {
216 // FIXME: Needing refactor or cleanup -@zhangxueping at 2021-10-10T16:16:24+08:00
217 // nothrow
218
219 _borrowLocker.lock();
220 bool isUnlocked = false;
221
222 scope(exit) {
223 version(HUNT_POOL_DEBUG_MORE) warningf("isUnlocked: %s...", isUnlocked);
224
225 if(!isUnlocked) {
226 _borrowLocker.unlock();
227 }
228 }
229
230 PooledObject!(T) pooledObj;
231 T underlyingObj = null;
232 bool r = false;
233 size_t index = 0;
234
235 for(; index<_pooledObjects.length; index++) {
236 pooledObj = _pooledObjects[index];
237
238 version(HUNT_POOL_DEBUG_MORE) {
239 tracef("Pool: %s, slot[%d] => %s", _poolOptions.name, index, pooledObj.to!string());
240 }
241
242 if(pooledObj is null) {
243 PooledObject!(T) obj = new PooledObject!(T)();
244 _pooledObjects[index] = obj;
245 pooledObj = obj;
246 isUnlocked = true;
247 _borrowLocker.unlock();
248
249 version(HUNT_POOL_DEBUG_MORE) {
250 tracef("Pool: %s, binding slot[%d] => %s", _poolOptions.name, index, obj.toString());
251 }
252
253 try {
254 underlyingObj = _factory.makeObject();
255
256 // tracef("Pool: %s, binded slot[%d] locking...", _poolOptions.name, index);
257 // _borrowLocker.lock();
258 // tracef("Pool: %s, binded slot[%d] locked... underlyingObj is null: %s", _poolOptions.name, index, underlyingObj is null);
259
260 // obj.bind(underlyingObj);
261 r = pooledObj.allocate(underlyingObj);
262 // isUnlocked = true;
263 // _borrowLocker.unlock();
264 } catch(Throwable t) {
265 warning(t.msg);
266 version(HUNT_DEBUG) warning(t);
267 pooledObj = null;
268 _pooledObjects[index] = null;
269 }
270
271 version(HUNT_POOL_DEBUG) {
272 tracef("Pool: %s, binded slot[%d] => %s", _poolOptions.name, index, obj.toString());
273 }
274
275 break;
276 } else if(pooledObj.isIdle()) {
277 underlyingObj = pooledObj.getObject();
278 bool isValid = _factory.isValid(underlyingObj);
279 if(isValid) {
280 r = pooledObj.allocate();
281 // isUnlocked = true;
282 // _borrowLocker.unlock();
283 break;
284 } else {
285 pooledObj.invalidate();
286 version(HUNT_POOL_DEBUG) {
287 warningf("Pool: %s. An invalid object (id=%d) detected at slot %d.",
288 _poolOptions.name, pooledObj.id, index);
289 }
290 }
291 } else if(pooledObj.isInvalid()) {
292 underlyingObj = pooledObj.getObject();
293 version(HUNT_POOL_DEBUG) {
294 warningf("Pool: %s. An invalid object (id=%d) detected at slot %d.",
295 _poolOptions.name, pooledObj.id, index);
296 }
297 _pooledObjects[index] = null;
298 // isUnlocked = true;
299 // _borrowLocker.unlock();
300 _factory.destroyObject(underlyingObj);
301 break;
302 }
303
304 pooledObj = null;
305 }
306
307 if(pooledObj is null) {
308 version(HUNT_DEBUG) {
309 warningf("Failed to borrow from {%s}", toString());
310 }
311 return null;
312 }
313
314 if(r) {
315 version(HUNT_POOL_DEBUG) {
316 infof("Pool: %s, allocate: %s, borrowed: {%s}", _poolOptions.name, r, pooledObj.toString());
317 }
318 return underlyingObj;
319 } else {
320 warningf("Pool: %s, borrowing collision: slot[%d]", _poolOptions.name, index, pooledObj.toString());
321 return null;
322 }
323 }
324
325 /**
326 * Returns an instance to the pool. By contract, <code>obj</code>
327 * <strong>must</strong> have been obtained using {@link #borrowObject()} or
328 * a related method as defined in an implementation or sub-interface.
329 *
330 * @param obj a {@link #borrowObject borrowed} instance to be returned.
331 */
332 void returnObject(T obj) {
333 if(obj !is null) {
334 doReturning(obj);
335 }
336
337 handleWaiters();
338 }
339
340 private bool doReturning(T obj) {
341 // _pooledObjectsLocker.lock();
342 // scope(exit) {
343 // _pooledObjectsLocker.unlock();
344 // }
345
346 bool result = false;
347
348 PooledObject!(T) pooledObj;
349 for(size_t index; index<_pooledObjects.length; index++) {
350 pooledObj = _pooledObjects[index];
351 if(pooledObj is null) {
352 continue;
353 }
354
355 T underlyingObj = pooledObj.getObject();
356 if(underlyingObj is obj) {
357 version(HUNT_POOL_DEBUG_MORE) {
358 tracef("Pool: %s, slot: %d, returning: {%s}", _poolOptions.name, index, pooledObj.toString());
359 }
360
361 result = pooledObj.returning();
362
363 if(result) {
364 version(HUNT_POOL_DEBUG) {
365 tracef("Pool: %s; slot: %d, Returned: {%s}",
366 _poolOptions.name, index, pooledObj.toString());
367 }
368 } else {
369 errorf("Pool: %s, slot: %d, Return failed: {%s}", _poolOptions.name, index, pooledObj.toString());
370 }
371 break;
372 }
373 }
374
375 version(HUNT_DEBUG) {
376 info(toString());
377 }
378 // info(toString());
379 return result;
380 }
381
382 private void handleWaiters() {
383 if(_state == ObjectPoolState.Closing || _state == ObjectPoolState.Closed) {
384 return;
385 }
386
387 if(_state != ObjectPoolState.Open) {
388 warningf("Failed to query the waiters. The state is %s.", _state);
389 return;
390 }
391
392 while(true) {
393
394 if(_waiters.empty()) {
395 version(HUNT_POOL_DEBUG) warningf("Pool: %s => No waiter avaliable.", _poolOptions.name);
396 break;
397 }
398
399 //
400 T r = doBorrow();
401 if(r is null) {
402 version(HUNT_POOL_DEBUG) warningf("Pool: %s => No idle object avaliable", _poolOptions.name);
403 break;
404 }
405
406 FuturePromise!T waiter = safeGetFrontWaiter();
407 if(waiter is null) {
408 doReturning(r);
409 break;
410 }
411
412 version(HUNT_POOL_DEBUG) {
413 tracef("Borrowing for waiter [%s], isDone: %s", waiter.id(), waiter.isDone());
414 }
415
416 //
417 try {
418 if(waiter.succeeded(r)) {
419 version(HUNT_POOL_DEBUG) {
420 tracef("Borrowed for waiter [%s], result: %s", waiter.id(), (cast(Object)r).toString());
421 }
422 } else {
423 warningf("Failed to set the result for promise [%s] with %s",
424 waiter.id(), (cast(Object)r).toString());
425
426 doReturning(r);
427 }
428 } catch(Throwable ex) {
429 warning(ex);
430 doReturning(r);
431 }
432 }
433 }
434
435 /**
436 * Returns the number of instances currently idle in this pool. This may be
437 * considered an approximation of the number of objects that can be
438 * {@link #borrowObject borrowed} without creating any new instances.
439 * Returns a negative value if this information is not available.
440 * @return the number of instances currently idle in this pool.
441 */
442 size_t getNumIdle() {
443 size_t count = 0;
444
445 foreach(PooledObject!(T) obj; _pooledObjects) {
446 if(obj !is null && obj.isIdle()) {
447 count++;
448 }
449 }
450
451 return count;
452 }
453
454 size_t getNumFree() {
455
456 size_t count = 0;
457
458 foreach(PooledObject!(T) obj; _pooledObjects) {
459 if(obj is null || obj.isUnusable() || obj.isInvalid()) {
460 count++;
461 }
462 }
463
464 return count;
465 }
466
467 /**
468 * Returns the number of instances currently borrowed from this pool. Returns
469 * a negative value if this information is not available.
470 * @return the number of instances currently borrowed from this pool.
471 */
472 size_t getNumActive() {
473 size_t count = 0;
474
475 foreach(PooledObject!(T) obj; _pooledObjects) {
476 if(obj !is null && obj.isInUse()) {
477 count++;
478 }
479 }
480
481 return count;
482 }
483
484 /**
485 * Returns an estimate of the number of threads currently blocked waiting for
486 * an object from the pool. This is intended for monitoring only, not for
487 * synchronization control.
488 *
489 * @return The estimate of the number of threads currently blocked waiting
490 * for an object from the pool
491 */
492 size_t getNumWaiters() {
493 _waitersLocker.lock();
494 scope(exit) {
495 _waitersLocker.unlock();
496 }
497 return walkLength(_waiters[]);
498 }
499
500 /**
501 * Clears any objects sitting idle in the pool, releasing any associated
502 * resources (optional operation). Idle objects cleared must be
503 * {@link PooledObjectFactory#destroyObject(PooledObject)}.
504 *
505 * @throws Exception if the pool cannot be cleared
506 */
507 void clear() {
508 version(HUNT_POOL_DEBUG) {
509 infof("Pool [%s] is clearing...", _poolOptions.name);
510 }
511
512 bool r = cas(&_isClearing, false, true);
513 if(!r) {
514 return;
515 }
516
517 _borrowLocker.lock();
518 scope(exit) {
519 _isClearing = false;
520 version(HUNT_POOL_DEBUG) infof("Pool [%s] is cleared...", _poolOptions.name);
521 _borrowLocker.unlock();
522 }
523
524 for(size_t index; index<_pooledObjects.length; index++) {
525 PooledObject!(T) obj = _pooledObjects[index];
526
527 if(obj !is null) {
528 version(HUNT_POOL_DEBUG) {
529 tracef("clearing object: id=%d, slot=%d", obj.id, index);
530 }
531
532 _pooledObjects[index] = null;
533 obj.abandoned();
534
535 // TODO: It's better to run it asynchronously
536 _factory.destroyObject(obj.getObject());
537 }
538 }
539 }
540
541 /**
542 * Closes this pool, and free any resources associated with it.
543 * <p>
544 * Calling {@link #borrowObject} after invoking this
545 * method on a pool will cause them to throw an {@link IllegalStateException}.
546 * </p>
547 * <p>
548 * Implementations should silently fail if not all resources can be freed.
549 * </p>
550 */
551 void close() {
552 version(HUNT_DEBUG) {
553 // infof("Closing pool %s (state=%s)...", _poolOptions.name, _state);
554 tracef(toString());
555 }
556
557 bool r = cas(&_state, ObjectPoolState.Open, ObjectPoolState.Closing);
558 if(!r) {
559 return;
560 }
561
562 scope(exit) {
563 _state = ObjectPoolState.Closed;
564 version(HUNT_POOL_DEBUG) {
565 infof("Pool %s closed...", _poolOptions.name);
566 }
567 }
568
569 for(size_t index; index<_pooledObjects.length; index++) {
570 PooledObject!(T) obj = _pooledObjects[index];
571
572 if(obj !is null) {
573 version(HUNT_POOL_DEBUG) {
574 tracef("Pool: %s, destroying object: id=%d, slot=%d, state: %s",
575 _poolOptions.name, obj.id, index, obj.state());
576
577 if(obj.state() == PooledObjectState.ALLOCATED) {
578 warningf("binded obj: %s", (cast(Object)obj.getObject()).toString());
579 }
580 }
581
582 _pooledObjects[index] = null;
583 obj.abandoned();
584
585 // TODO: It's better to run it asynchronously
586 _factory.destroyObject(obj.getObject());
587 }
588 }
589
590 }
591
592 override string toString() {
593 string str = format("Name: %s, Total: %d, Active: %d, Idle: %d, Free: %d, Waiters: %d",
594 _poolOptions.name, size(), getNumActive(), getNumIdle(), getNumFree(), getNumWaiters());
595 return str;
596 }
597 }