1 module hunt.util.thread.PooledThread; 2 3 import hunt.Functions; 4 import hunt.logging; 5 import hunt.util.thread.Task; 6 7 import core.atomic; 8 // import core.memory; 9 import core.thread; 10 import core.sync.condition; 11 import core.sync.mutex; 12 13 import std.conv; 14 import std.format; 15 16 17 enum PooledThreadState { 18 Idle, 19 Busy, // occupied 20 Stopped 21 } 22 23 alias PooledThreadHandler = Action2!(PooledThread, Task); 24 25 /** 26 * 27 */ 28 class PooledThread : Thread { 29 30 private shared PooledThreadState _state; 31 private size_t _index; 32 private Task _task; 33 private Duration _timeout; 34 35 private Condition _condition; 36 private Mutex _mutex; 37 38 private PooledThreadHandler _taskDoneHandler; 39 40 this(size_t index, Duration timeout = 5.seconds, size_t stackSize = 0) { 41 _index = index; 42 _timeout = timeout; 43 _state = PooledThreadState.Idle; 44 _mutex = new Mutex(); 45 _condition = new Condition(_mutex); 46 this.name = "PooledThread-" ~ _index.to!string(); 47 super(&run, stackSize); 48 } 49 50 void onTaskDone(PooledThreadHandler handler) { 51 _taskDoneHandler = handler; 52 } 53 54 void stop() { 55 version(HUNT_POOL_DEBUG_MORE) { 56 infof("Stopping thread %s", this.name); 57 } 58 59 _mutex.lock(); 60 scope (exit) { 61 version(HUNT_POOL_DEBUG_MORE) { 62 infof("thread [%s] stopped", this.name); 63 } 64 _mutex.unlock(); 65 } 66 _state = PooledThreadState.Stopped; 67 _condition.notify(); 68 } 69 70 bool isBusy() { 71 return _state == PooledThreadState.Busy; 72 } 73 74 bool isIdle() { 75 return _state == PooledThreadState.Idle; 76 } 77 78 PooledThreadState state() { 79 return _state; 80 } 81 82 size_t index() { 83 return _index; 84 } 85 86 Task task() { 87 return _task; 88 } 89 90 bool attatch(Task task) { 91 assert(task !is null); 92 version(HUNT_POOL_DEBUG_MORE) { 93 tracef("attatching task %s with thread %s (state: %s)", task.name, this.name, _state); 94 } 95 96 bool r = cas(&_state, PooledThreadState.Idle, PooledThreadState.Busy); 97 98 if (r) { 99 _mutex.lock(); 100 scope (exit) { 101 version(HUNT_POOL_DEBUG) { 102 infof("task [%s] attatched with thread %s", task.name, this.name); 103 } 104 _mutex.unlock(); 105 } 106 _task = task; 107 _condition.notify(); 108 109 } else { 110 string msg = format("Thread [%s] is unavailable. state: %s", this.name(), _state); 111 warningf(msg); 112 throw new Exception(msg); 113 } 114 115 return r; 116 } 117 118 private void run() nothrow { 119 while (_state != PooledThreadState.Stopped) { 120 121 try { 122 doRun(); 123 } catch (Throwable ex) { 124 warning(ex); 125 } 126 127 version (HUNT_POOL_DEBUG_MORE) { 128 tracef("%s Done. state: %s", this.name(), _state); 129 } 130 131 Task task = _task; 132 if(_state != PooledThreadState.Stopped) { 133 bool r = cas(&_state, PooledThreadState.Busy, PooledThreadState.Idle); 134 version(HUNT_POOL_DEBUG_MORE) { 135 if(!r) { 136 warningf("Failed to set thread %s to Idle, its state is %s", this.name, _state); 137 } 138 } 139 } 140 141 version(HUNT_POOL_DEBUG_MORE) tracef("Run done: %s", this.toString()); 142 _task = null; 143 144 if(_taskDoneHandler !is null && task !is null) { 145 try { 146 _taskDoneHandler(this, task); 147 } catch(Throwable t) { 148 warning(t); 149 } 150 } 151 } 152 153 version (HUNT_POOL_DEBUG) { 154 tracef("Thread [%s] stopped. State: %s", this.name(), _state); 155 } 156 } 157 158 private bool _isWaiting = false; 159 160 private void doRun() { 161 _mutex.lock(); 162 163 Task task = _task; 164 while(task is null && _state != PooledThreadState.Stopped) { 165 bool r = _condition.wait(_timeout); 166 task = _task; 167 168 version(HUNT_POOL_DEBUG_MORE) { 169 if(!r && _state == PooledThreadState.Busy) { 170 if(task is null) { 171 warningf("No task attatched on a busy thread %s in %s, task: %s", this.name, _timeout); 172 } else { 173 warningf("more tests need for this status, thread %s in %s", this.name, _timeout); 174 } 175 } 176 } 177 } 178 179 _mutex.unlock(); 180 181 if(task !is null) { 182 version(HUNT_POOL_DEBUG) { 183 tracef("Try to exeucte task [%s] in thread %s. task: %s", task.name, this.name, task.status); 184 } 185 task.execute(); 186 } 187 } 188 189 override string toString() { 190 import std.format; 191 string r = format("name: %s, state: %s", name, _state); 192 Task task = _task; 193 if(task !is null) { 194 return r ~ ", task: " ~ task.toString(); 195 } else { 196 return r; 197 } 198 199 } 200 }