1 module hunt.util.pool.ObjectPool; 2 3 import hunt.concurrency.Future; 4 import hunt.concurrency.Promise; 5 import hunt.concurrency.FuturePromise; 6 import hunt.logging; 7 8 import core.atomic; 9 import core.sync.mutex; 10 import core.time; 11 12 import std.container.dlist; 13 import std.conv; 14 import std.format; 15 import std.range : walkLength; 16 17 import hunt.util.pool.ObjectFactory; 18 import hunt.util.pool.PooledObject; 19 import hunt.util.pool.PooledObjectState; 20 21 /** 22 * 23 */ 24 enum CreationMode { 25 Lazy, 26 Eager 27 } 28 29 /** 30 * 31 */ 32 class PoolOptions { 33 size_t size = 5; 34 int maxWaitQueueSize = -1; 35 string name; 36 Duration waitTimeout = 15.seconds; 37 CreationMode creationMode = CreationMode.Lazy; 38 } 39 40 41 enum ObjectPoolState { 42 Open, 43 Closing, 44 Closed 45 } 46 47 /** 48 * 49 */ 50 class ObjectPool(T) { 51 52 private PoolOptions _poolOptions; 53 private shared ObjectPoolState _state = ObjectPoolState.Open; 54 private shared bool _isClearing = false; 55 private shared int _waiterNumber = 0; 56 private ObjectFactory!(T) _factory; 57 private PooledObject!(T)[] _pooledObjects; 58 private Mutex _borrowLocker; 59 private Mutex _waitersLocker; 60 private DList!(FuturePromise!T) _waiters; 61 62 static if(is(T == class) && __traits(compiles, new T())) { 63 this(PoolOptions options) { 64 this(new DefaultObjectFactory!(T)(), options); 65 } 66 } 67 68 this(ObjectFactory!(T) factory, PoolOptions options) { 69 _factory = factory; 70 _poolOptions = options; 71 _pooledObjects = new PooledObject!(T)[options.size]; 72 _waitersLocker = new Mutex(); 73 _borrowLocker = new Mutex(); 74 } 75 76 ObjectPoolState state() { 77 return _state; 78 } 79 80 size_t size() { 81 return _poolOptions.size; 82 } 83 84 /** 85 * Obtains an instance from this pool. 86 * <p> 87 * By contract, clients <strong>must</strong> return the borrowed instance 88 * using {@link #returnObject}, {@link #invalidateObject}, or a related 89 * method as defined in an implementation or sub-interface. 90 * </p> 91 * <p> 92 * The behaviour of this method when the pool has been exhausted 93 * is not strictly specified (although it may be specified by 94 * implementations). 95 * </p> 96 * 97 * @return an instance from this pool. 98 */ 99 T borrow() { 100 return borrow(_poolOptions.waitTimeout, true); 101 } 102 103 T borrow(Duration timeout, bool isQuiet = true) { 104 T r; 105 if(timeout == Duration.zero) { 106 r = doBorrow(); 107 if(r is null && !isQuiet) { 108 throw new Exception("No idle object avaliable."); 109 } 110 } else { 111 Future!T future = borrowAsync(); 112 if(timeout.isNegative()) { 113 version(HUNT_POOL_DEBUG) { 114 tracef("Borrowing with promise [%s]...", (cast(FuturePromise!T)future).id()); 115 } 116 r = future.get(); 117 } else { 118 version(HUNT_POOL_DEBUG) { 119 tracef("Borrowing with promise [%s] in %s", (cast(FuturePromise!T)future).id(), timeout); 120 } 121 r = future.get(timeout); 122 } 123 124 version(HUNT_POOL_DEBUG_MORE) { 125 tracef("Borrowed {%s} from promise [%s]", r.to!string(), (cast(FuturePromise!T)future).id()); 126 } 127 } 128 129 return r; 130 } 131 132 133 /** 134 * 135 */ 136 Future!T borrowAsync() { 137 version (HUNT_POOL_DEBUG) infof("Borrowing...%s", toString()); 138 139 int number = atomicOp!("+=")(_waiterNumber, 1) - 1; 140 FuturePromise!T promise = new FuturePromise!T("PoolWaiter " ~ number.to!string()); 141 size_t waitNumber = getNumWaiters(); 142 version(HUNT_POOL_DEBUG_MORE) { 143 tracef("Pool: %s, new waiter [%s], current waitNumber: %d", _poolOptions.name, promise.id(), waitNumber); 144 } 145 146 if(_poolOptions.maxWaitQueueSize == -1 || waitNumber < _poolOptions.maxWaitQueueSize) { 147 safeInsertWaiter(promise); 148 } else { 149 string msg = format("Reach to the max WaitNumber (%d), the current: %d", 150 _poolOptions.maxWaitQueueSize, waitNumber); 151 152 version(HUNT_DEBUG) { 153 warning(msg); 154 } 155 promise.failed(new Exception(msg)); 156 } 157 158 handleWaiters(); 159 return promise; 160 } 161 162 private void safeInsertWaiter(FuturePromise!T promise) { 163 _waitersLocker.lock(); 164 scope(exit) { 165 _waitersLocker.unlock(); 166 } 167 168 _waiters.stableInsert(promise); 169 } 170 171 private FuturePromise!T safeGetFrontWaiter() { 172 _waitersLocker.lock(); 173 scope(exit) { 174 _waitersLocker.unlock(); 175 } 176 177 // FIXME: Needing refactor or cleanup -@zhangxueping at 2021-10-10T21:13:56+08:00 178 // More test 179 if(_waiters.empty) return null; 180 181 FuturePromise!T waiter = _waiters.front(); 182 183 // Clear up all the finished waiter until a awaiting waiter found 184 while(waiter.isDone()) { 185 version(HUNT_POOL_DEBUG_MORE) 186 tracef("Waiter %s is done, so removed.", waiter.id()); 187 _waiters.removeFront(); 188 189 if(_waiters.empty()) { 190 version(HUNT_POOL_DEBUG) trace("No awaiting waiter found."); 191 return null; 192 } 193 194 waiter = _waiters.front(); 195 } 196 _waiters.removeFront(); 197 198 return waiter; 199 } 200 201 private void safeRemoveFrontWaiter() { 202 203 _waitersLocker.lock(); 204 scope(exit) { 205 _waitersLocker.unlock(); 206 } 207 208 _waiters.removeFront(); 209 } 210 211 212 /** 213 * 214 */ 215 private T doBorrow() { 216 // FIXME: Needing refactor or cleanup -@zhangxueping at 2021-10-10T16:16:24+08:00 217 // nothrow 218 219 _borrowLocker.lock(); 220 bool isUnlocked = false; 221 222 scope(exit) { 223 version(HUNT_POOL_DEBUG_MORE) warningf("isUnlocked: %s...", isUnlocked); 224 225 if(!isUnlocked) { 226 _borrowLocker.unlock(); 227 } 228 } 229 230 PooledObject!(T) pooledObj; 231 T underlyingObj = null; 232 bool r = false; 233 size_t index = 0; 234 235 for(; index<_pooledObjects.length; index++) { 236 pooledObj = _pooledObjects[index]; 237 238 version(HUNT_POOL_DEBUG_MORE) { 239 tracef("Pool: %s, slot[%d] => %s", _poolOptions.name, index, pooledObj.to!string()); 240 } 241 242 if(pooledObj is null) { 243 PooledObject!(T) obj = new PooledObject!(T)(); 244 _pooledObjects[index] = obj; 245 pooledObj = obj; 246 isUnlocked = true; 247 _borrowLocker.unlock(); 248 249 version(HUNT_POOL_DEBUG_MORE) { 250 tracef("Pool: %s, binding slot[%d] => %s", _poolOptions.name, index, obj.toString()); 251 } 252 253 try { 254 underlyingObj = _factory.makeObject(); 255 256 // tracef("Pool: %s, binded slot[%d] locking...", _poolOptions.name, index); 257 // _borrowLocker.lock(); 258 // tracef("Pool: %s, binded slot[%d] locked... underlyingObj is null: %s", _poolOptions.name, index, underlyingObj is null); 259 260 // obj.bind(underlyingObj); 261 r = pooledObj.allocate(underlyingObj); 262 // isUnlocked = true; 263 // _borrowLocker.unlock(); 264 } catch(Throwable t) { 265 warning(t.msg); 266 version(HUNT_DEBUG) warning(t); 267 pooledObj = null; 268 _pooledObjects[index] = null; 269 } 270 271 version(HUNT_POOL_DEBUG) { 272 tracef("Pool: %s, binded slot[%d] => %s", _poolOptions.name, index, obj.toString()); 273 } 274 275 break; 276 } else if(pooledObj.isIdle()) { 277 underlyingObj = pooledObj.getObject(); 278 bool isValid = _factory.isValid(underlyingObj); 279 if(isValid) { 280 r = pooledObj.allocate(); 281 // isUnlocked = true; 282 // _borrowLocker.unlock(); 283 break; 284 } else { 285 pooledObj.invalidate(); 286 version(HUNT_POOL_DEBUG) { 287 warningf("Pool: %s. An invalid object (id=%d) detected at slot %d.", 288 _poolOptions.name, pooledObj.id, index); 289 } 290 } 291 } else if(pooledObj.isInvalid()) { 292 underlyingObj = pooledObj.getObject(); 293 version(HUNT_POOL_DEBUG) { 294 warningf("Pool: %s. An invalid object (id=%d) detected at slot %d.", 295 _poolOptions.name, pooledObj.id, index); 296 } 297 _pooledObjects[index] = null; 298 // isUnlocked = true; 299 // _borrowLocker.unlock(); 300 _factory.destroyObject(underlyingObj); 301 break; 302 } 303 304 pooledObj = null; 305 } 306 307 if(pooledObj is null) { 308 version(HUNT_DEBUG) { 309 warningf("Failed to borrow from {%s}", toString()); 310 } 311 return null; 312 } 313 314 if(r) { 315 version(HUNT_POOL_DEBUG) { 316 infof("Pool: %s, allocate: %s, borrowed: {%s}", _poolOptions.name, r, pooledObj.toString()); 317 } 318 return underlyingObj; 319 } else { 320 warningf("Pool: %s, borrowing collision: slot[%d]", _poolOptions.name, index, pooledObj.toString()); 321 return null; 322 } 323 } 324 325 /** 326 * Returns an instance to the pool. By contract, <code>obj</code> 327 * <strong>must</strong> have been obtained using {@link #borrowObject()} or 328 * a related method as defined in an implementation or sub-interface. 329 * 330 * @param obj a {@link #borrowObject borrowed} instance to be returned. 331 */ 332 void returnObject(T obj) { 333 if(obj !is null) { 334 doReturning(obj); 335 } 336 337 handleWaiters(); 338 } 339 340 private bool doReturning(T obj) { 341 // _pooledObjectsLocker.lock(); 342 // scope(exit) { 343 // _pooledObjectsLocker.unlock(); 344 // } 345 346 bool result = false; 347 348 PooledObject!(T) pooledObj; 349 for(size_t index; index<_pooledObjects.length; index++) { 350 pooledObj = _pooledObjects[index]; 351 if(pooledObj is null) { 352 continue; 353 } 354 355 T underlyingObj = pooledObj.getObject(); 356 if(underlyingObj is obj) { 357 version(HUNT_POOL_DEBUG_MORE) { 358 tracef("Pool: %s, slot: %d, returning: {%s}", _poolOptions.name, index, pooledObj.toString()); 359 } 360 361 result = pooledObj.returning(); 362 363 if(result) { 364 version(HUNT_POOL_DEBUG) { 365 tracef("Pool: %s; slot: %d, Returned: {%s}", 366 _poolOptions.name, index, pooledObj.toString()); 367 } 368 } else { 369 errorf("Pool: %s, slot: %d, Return failed: {%s}", _poolOptions.name, index, pooledObj.toString()); 370 } 371 break; 372 } 373 } 374 375 version(HUNT_DEBUG) { 376 info(toString()); 377 } 378 // info(toString()); 379 return result; 380 } 381 382 private void handleWaiters() { 383 if(_state == ObjectPoolState.Closing || _state == ObjectPoolState.Closed) { 384 return; 385 } 386 387 if(_state != ObjectPoolState.Open) { 388 warningf("Failed to query the waiters. The state is %s.", _state); 389 return; 390 } 391 392 while(true) { 393 394 if(_waiters.empty()) { 395 version(HUNT_POOL_DEBUG) warningf("Pool: %s => No waiter avaliable.", _poolOptions.name); 396 break; 397 } 398 399 // 400 T r = doBorrow(); 401 if(r is null) { 402 version(HUNT_POOL_DEBUG) warningf("Pool: %s => No idle object avaliable", _poolOptions.name); 403 break; 404 } 405 406 FuturePromise!T waiter = safeGetFrontWaiter(); 407 if(waiter is null) { 408 doReturning(r); 409 break; 410 } 411 412 version(HUNT_POOL_DEBUG) { 413 tracef("Borrowing for waiter [%s], isDone: %s", waiter.id(), waiter.isDone()); 414 } 415 416 // 417 try { 418 if(waiter.succeeded(r)) { 419 version(HUNT_POOL_DEBUG) { 420 tracef("Borrowed for waiter [%s], result: %s", waiter.id(), (cast(Object)r).toString()); 421 } 422 } else { 423 warningf("Failed to set the result for promise [%s] with %s", 424 waiter.id(), (cast(Object)r).toString()); 425 426 doReturning(r); 427 } 428 } catch(Throwable ex) { 429 warning(ex); 430 doReturning(r); 431 } 432 } 433 } 434 435 /** 436 * Returns the number of instances currently idle in this pool. This may be 437 * considered an approximation of the number of objects that can be 438 * {@link #borrowObject borrowed} without creating any new instances. 439 * Returns a negative value if this information is not available. 440 * @return the number of instances currently idle in this pool. 441 */ 442 size_t getNumIdle() { 443 size_t count = 0; 444 445 foreach(PooledObject!(T) obj; _pooledObjects) { 446 if(obj !is null && obj.isIdle()) { 447 count++; 448 } 449 } 450 451 return count; 452 } 453 454 size_t getNumFree() { 455 456 size_t count = 0; 457 458 foreach(PooledObject!(T) obj; _pooledObjects) { 459 if(obj is null || obj.isUnusable() || obj.isInvalid()) { 460 count++; 461 } 462 } 463 464 return count; 465 } 466 467 /** 468 * Returns the number of instances currently borrowed from this pool. Returns 469 * a negative value if this information is not available. 470 * @return the number of instances currently borrowed from this pool. 471 */ 472 size_t getNumActive() { 473 size_t count = 0; 474 475 foreach(PooledObject!(T) obj; _pooledObjects) { 476 if(obj !is null && obj.isInUse()) { 477 count++; 478 } 479 } 480 481 return count; 482 } 483 484 /** 485 * Returns an estimate of the number of threads currently blocked waiting for 486 * an object from the pool. This is intended for monitoring only, not for 487 * synchronization control. 488 * 489 * @return The estimate of the number of threads currently blocked waiting 490 * for an object from the pool 491 */ 492 size_t getNumWaiters() { 493 _waitersLocker.lock(); 494 scope(exit) { 495 _waitersLocker.unlock(); 496 } 497 return walkLength(_waiters[]); 498 } 499 500 /** 501 * Clears any objects sitting idle in the pool, releasing any associated 502 * resources (optional operation). Idle objects cleared must be 503 * {@link PooledObjectFactory#destroyObject(PooledObject)}. 504 * 505 * @throws Exception if the pool cannot be cleared 506 */ 507 void clear() { 508 version(HUNT_POOL_DEBUG) { 509 infof("Pool [%s] is clearing...", _poolOptions.name); 510 } 511 512 bool r = cas(&_isClearing, false, true); 513 if(!r) { 514 return; 515 } 516 517 _borrowLocker.lock(); 518 scope(exit) { 519 _isClearing = false; 520 version(HUNT_POOL_DEBUG) infof("Pool [%s] is cleared...", _poolOptions.name); 521 _borrowLocker.unlock(); 522 } 523 524 for(size_t index; index<_pooledObjects.length; index++) { 525 PooledObject!(T) obj = _pooledObjects[index]; 526 527 if(obj !is null) { 528 version(HUNT_POOL_DEBUG) { 529 tracef("clearing object: id=%d, slot=%d", obj.id, index); 530 } 531 532 _pooledObjects[index] = null; 533 obj.abandoned(); 534 535 // TODO: It's better to run it asynchronously 536 _factory.destroyObject(obj.getObject()); 537 } 538 } 539 } 540 541 /** 542 * Closes this pool, and free any resources associated with it. 543 * <p> 544 * Calling {@link #borrowObject} after invoking this 545 * method on a pool will cause them to throw an {@link IllegalStateException}. 546 * </p> 547 * <p> 548 * Implementations should silently fail if not all resources can be freed. 549 * </p> 550 */ 551 void close() { 552 version(HUNT_DEBUG) { 553 // infof("Closing pool %s (state=%s)...", _poolOptions.name, _state); 554 tracef(toString()); 555 } 556 557 bool r = cas(&_state, ObjectPoolState.Open, ObjectPoolState.Closing); 558 if(!r) { 559 return; 560 } 561 562 scope(exit) { 563 _state = ObjectPoolState.Closed; 564 version(HUNT_POOL_DEBUG) { 565 infof("Pool %s closed...", _poolOptions.name); 566 } 567 } 568 569 for(size_t index; index<_pooledObjects.length; index++) { 570 PooledObject!(T) obj = _pooledObjects[index]; 571 572 if(obj !is null) { 573 version(HUNT_POOL_DEBUG) { 574 tracef("Pool: %s, destroying object: id=%d, slot=%d, state: %s", 575 _poolOptions.name, obj.id, index, obj.state()); 576 577 if(obj.state() == PooledObjectState.ALLOCATED) { 578 warningf("binded obj: %s", (cast(Object)obj.getObject()).toString()); 579 } 580 } 581 582 _pooledObjects[index] = null; 583 obj.abandoned(); 584 585 // TODO: It's better to run it asynchronously 586 _factory.destroyObject(obj.getObject()); 587 } 588 } 589 590 } 591 592 override string toString() { 593 string str = format("Name: %s, Total: %d, Active: %d, Idle: %d, Free: %d, Waiters: %d", 594 _poolOptions.name, size(), getNumActive(), getNumIdle(), getNumFree(), getNumWaiters()); 595 return str; 596 } 597 }