1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.concurrency.IteratingCallback; 13 14 import hunt.concurrency.Locker; 15 import hunt.util.Common; 16 import hunt.Exceptions; 17 18 import std.format; 19 20 21 /** 22 * This specialized callback implements a pattern that allows 23 * a large job to be broken into smaller tasks using iteration 24 * rather than recursion. 25 * <p> 26 * A typical example is the write of a large content to a socket, 27 * divided in chunks. Chunk C1 is written by thread T1, which 28 * also invokes the callback, which writes chunk C2, which invokes 29 * the callback again, which writes chunk C3, and so forth. 30 * </p> 31 * <p> 32 * The problem with the example is that if the callback thread 33 * is the same that performs the I/O operation, then the process 34 * is recursive and may result in a stack overflow. 35 * To avoid the stack overflow, a thread dispatch must be performed, 36 * causing context switching and cache misses, affecting performance. 37 * </p> 38 * <p> 39 * To avoid this issue, this callback uses an AtomicReference to 40 * record whether success callback has been called during the processing 41 * of a sub task, and if so then the processing iterates rather than 42 * recurring. 43 * </p> 44 * <p> 45 * Subclasses must implement method {@link #process()} where the sub 46 * task is executed and a suitable {@link IteratingCallback.Action} is 47 * returned to this callback to indicate the overall progress of the job. 48 * This callback is passed to the asynchronous execution of each sub 49 * task and a call the {@link #succeeded()} on this callback represents 50 * the completion of the sub task. 51 * </p> 52 */ 53 abstract class IteratingCallback : Callback { 54 /** 55 * The internal states of this callback 56 */ 57 private enum State { 58 /** 59 * This callback is IDLE, ready to iterate. 60 */ 61 IDLE, 62 63 /** 64 * This callback is iterating calls to {@link #process()} and is dealing with 65 * the returns. To get into processing state, it much of held the lock state 66 * and set iterating to true. 67 */ 68 PROCESSING, 69 70 /** 71 * Waiting for a schedule callback 72 */ 73 PENDING, 74 75 /** 76 * Called by a schedule callback 77 */ 78 CALLED, 79 80 /** 81 * The overall job has succeeded as indicated by a {@link Action#SUCCEEDED} return 82 * from {@link IteratingCallback#process()} 83 */ 84 SUCCEEDED, 85 86 /** 87 * The overall job has failed as indicated by a call to {@link IteratingCallback#failed(Exception)} 88 */ 89 FAILED, 90 91 /** 92 * This callback has been closed and cannot be reset. 93 */ 94 CLOSED 95 } 96 97 /** 98 * The indication of the overall progress of the overall job that 99 * implementations of {@link #process()} must return. 100 */ 101 protected enum Action { 102 /** 103 * Indicates that {@link #process()} has no more work to do, 104 * but the overall job is not completed yet, probably waiting 105 * for additional events to trigger more work. 106 */ 107 IDLE, 108 /** 109 * Indicates that {@link #process()} is executing asynchronously 110 * a sub task, where the execution has started but the callback 111 * may have not yet been invoked. 112 */ 113 SCHEDULED, 114 115 /** 116 * Indicates that {@link #process()} has completed the overall job. 117 */ 118 SUCCEEDED 119 } 120 121 private Locker _locker; // = new Locker(); 122 private State _state; 123 private bool _iterate; 124 125 126 protected this() { 127 _locker = new Locker(); 128 _state = State.IDLE; 129 } 130 131 protected this(bool needReset) { 132 _state = needReset ? State.SUCCEEDED : State.IDLE; 133 } 134 135 /** 136 * Method called by {@link #iterate()} to process the sub task. 137 * <p> 138 * Implementations must start the asynchronous execution of the sub task 139 * (if any) and return an appropriate action: 140 * </p> 141 * <ul> 142 * <li>{@link Action#IDLE} when no sub tasks are available for execution 143 * but the overall job is not completed yet</li> 144 * <li>{@link Action#SCHEDULED} when the sub task asynchronous execution 145 * has been started</li> 146 * <li>{@link Action#SUCCEEDED} when the overall job is completed</li> 147 * </ul> 148 * 149 * @return the appropriate Action 150 * @throws Exception if the sub task processing throws 151 */ 152 protected abstract Action process(); 153 154 /** 155 * Invoked when the overall task has completed successfully. 156 * 157 * @see #onCompleteFailure(Exception) 158 */ 159 protected void onCompleteSuccess() { 160 } 161 162 /** 163 * Invoked when the overall task has completed with a failure. 164 * 165 * @param cause the throwable to indicate cause of failure 166 * @see #onCompleteSuccess() 167 */ 168 protected void onCompleteFailure(Exception cause) { 169 } 170 171 /** 172 * This method must be invoked by applications to start the processing 173 * of sub tasks. It can be called at any time by any thread, and it's 174 * contract is that when called, then the {@link #process()} method will 175 * be called during or soon after, either by the calling thread or by 176 * another thread. 177 */ 178 void iterate() { 179 bool process = false; 180 bool canLoop = true; 181 182 while(canLoop){ 183 // try { 184 // Locker.Lock lock = _locker.lock(); 185 switch (_state) { 186 case State.PENDING: 187 case State.CALLED: 188 // process will be called when callback is handleds 189 canLoop = false; 190 break; 191 192 case State.IDLE: 193 _state = State.PROCESSING; 194 process = true; 195 canLoop = false; 196 break; 197 198 case State.PROCESSING: 199 _iterate = true; 200 canLoop = false; 201 break; 202 203 case State.FAILED: 204 case State.SUCCEEDED: 205 canLoop = false; 206 break; 207 208 case State.CLOSED: 209 default: 210 canLoop = false; 211 throw new IllegalStateException(toString()); 212 } 213 // } catch(Exception) {} 214 } 215 216 if (process) 217 processing(); 218 } 219 220 private void processing() { 221 // This should only ever be called when in processing state, however a failed or close call 222 // may happen concurrently, so state is not assumed. 223 224 bool on_complete_success = false; 225 226 // While we are processing 227 processing: 228 while (true) { 229 // Call process to get the action that we have to take. 230 Action action; 231 try { 232 action = process(); 233 } catch (Exception x) { 234 failed(x); 235 break; 236 } 237 238 // acted on the action we have just received 239 // try { 240 Locker.Lock lock = _locker.lock(); 241 switch (_state) { 242 case State.PROCESSING: { 243 switch (action) { 244 case Action.IDLE: { 245 // Has iterate been called while we were processing? 246 if (_iterate) { 247 // yes, so skip idle and keep processing 248 _iterate = false; 249 _state = State.PROCESSING; 250 continue processing; 251 } 252 253 // No, so we can go idle 254 _state = State.IDLE; 255 break processing; 256 } 257 258 case Action.SCHEDULED: { 259 // we won the race against the callback, so the callback has to process and we can break processing 260 _state = State.PENDING; 261 break processing; 262 } 263 264 case Action.SUCCEEDED: { 265 // we lost the race against the callback, 266 _iterate = false; 267 _state = State.SUCCEEDED; 268 on_complete_success = true; 269 break processing; 270 } 271 272 default: 273 throw new IllegalStateException(format("%s[action=%s]", this, action)); 274 } 275 } 276 277 case State.CALLED: { 278 switch (action) { 279 case Action.SCHEDULED: { 280 // we lost the race, so we have to keep processing 281 _state = State.PROCESSING; 282 continue processing; 283 } 284 285 default: 286 throw new IllegalStateException(format("%s[action=%s]", this, action)); 287 } 288 } 289 290 case State.SUCCEEDED: 291 case State.FAILED: 292 case State.CLOSED: 293 break processing; 294 295 case State.IDLE: 296 case State.PENDING: 297 default: 298 throw new IllegalStateException(format("%s[action=%s]", this, action)); 299 } 300 // }catch(Exception) {} 301 } 302 303 if (on_complete_success) 304 onCompleteSuccess(); 305 } 306 307 /** 308 * Invoked when the sub task succeeds. 309 * Subclasses that override this method must always remember to call 310 * {@code super.succeeded()}. 311 */ 312 override 313 void succeeded() { 314 bool process = false; 315 // try { 316 // Locker.Lock lock = _locker.lock(); 317 switch (_state) { 318 case State.PROCESSING: { 319 _state = State.CALLED; 320 break; 321 } 322 case State.PENDING: { 323 _state = State.PROCESSING; 324 process = true; 325 break; 326 } 327 case State.CLOSED: 328 case State.FAILED: { 329 // Too late! 330 break; 331 } 332 default: { 333 throw new IllegalStateException(toString()); 334 } 335 } 336 // }catch(Exception) {} 337 338 if (process) 339 processing(); 340 } 341 342 /** 343 * Invoked when the sub task fails. 344 * Subclasses that override this method must always remember to call 345 * {@code super.failed(Exception)}. 346 */ 347 override 348 void failed(Exception x) { 349 bool failure = false; 350 // try { 351 // Locker.Lock lock = _locker.lock(); 352 switch (_state) { 353 case State.SUCCEEDED: 354 case State.FAILED: 355 case State.IDLE: 356 case State.CLOSED: 357 case State.CALLED: 358 // too late!. 359 break; 360 361 case State.PENDING: 362 case State.PROCESSING: { 363 _state = State.FAILED; 364 failure = true; 365 break; 366 } 367 default: 368 throw new IllegalStateException(toString()); 369 } 370 // } catch(Exception) {} 371 372 if (failure) 373 onCompleteFailure(x); 374 } 375 376 bool isNonBlocking() { 377 return false; 378 } 379 380 void close() { 381 bool failure = false; 382 // try { 383 // Locker.Lock lock = _locker.lock(); 384 switch (_state) { 385 case State.IDLE: 386 case State.SUCCEEDED: 387 case State.FAILED: 388 _state = State.CLOSED; 389 break; 390 391 case State.CLOSED: 392 break; 393 394 default: 395 _state = State.CLOSED; 396 failure = true; 397 } 398 // }catch(Exception) {} 399 400 if (failure) 401 onCompleteFailure(new ClosedChannelException("")); 402 } 403 404 /* 405 * only for testing 406 * @return whether this callback is idle and {@link #iterate()} needs to be called 407 */ 408 bool isIdle() { 409 // try { 410 // Locker.Lock lock = _locker.lock(); 411 // return _state == State.IDLE; 412 // } 413 // catch(Exception) {} 414 return _state == State.IDLE; 415 } 416 417 bool isClosed() { 418 // try { 419 // Locker.Lock lock = _locker.lock(); 420 // return _state == State.CLOSED; 421 // } 422 // catch(Exception) {} 423 return _state == State.CLOSED; 424 } 425 426 /** 427 * @return whether this callback has failed 428 */ 429 bool isFailed() { 430 // try { 431 // Locker.Lock lock = _locker.lock(); 432 // return _state == State.FAILED; 433 // } 434 // catch(Exception) {} 435 436 return _state == State.FAILED; 437 } 438 439 /** 440 * @return whether this callback has succeeded 441 */ 442 bool isSucceeded() { 443 // try { 444 // Locker.Lock lock = _locker.lock(); 445 // return _state == State.SUCCEEDED; 446 // } 447 // catch(Exception) {} 448 449 return _state == State.SUCCEEDED; 450 } 451 452 /** 453 * Resets this callback. 454 * <p> 455 * A callback can only be reset to IDLE from the 456 * SUCCEEDED or FAILED states or if it is already IDLE. 457 * </p> 458 * 459 * @return true if the reset was successful 460 */ 461 bool reset() { 462 // try { 463 // Locker.Lock lock = _locker.lock(); 464 switch (_state) { 465 case State.IDLE: 466 return true; 467 468 case State.SUCCEEDED: 469 case State.FAILED: 470 _iterate = false; 471 _state = State.IDLE; 472 return true; 473 474 default: 475 return false; 476 } 477 // } 478 // catch(Exception) {} 479 } 480 481 override 482 string toString() { 483 return format("%s[%s]", super.toString(), _state); 484 } 485 }