1 module hunt.util.thread.PooledThread;
2 
3 import hunt.Functions;
4 import hunt.logging;
5 import hunt.util.thread.Task;
6 
7 import core.atomic;
8 // import core.memory;
9 import core.thread;
10 import core.sync.condition;
11 import core.sync.mutex;
12 
13 import std.conv;
14 import std.format;
15 
16 
17 enum PooledThreadState {
18     Idle,
19     Busy, // occupied
20     Stopped
21 }
22 
23 alias PooledThreadHandler = Action2!(PooledThread, Task);
24 
25 /** 
26  * 
27  */
28 class PooledThread : Thread {
29 
30     private shared PooledThreadState _state;
31     private size_t _index;
32     private Task _task;
33     private Duration _timeout;
34 
35     private Condition _condition;
36     private Mutex _mutex;
37 
38     private PooledThreadHandler _taskDoneHandler;
39 
40     this(size_t index, Duration timeout = 5.seconds, size_t stackSize = 0) {
41         _index = index;
42         _timeout = timeout;
43         _state = PooledThreadState.Idle;
44         _mutex = new Mutex();
45         _condition = new Condition(_mutex);
46         this.name = "PooledThread-" ~ _index.to!string();
47         super(&run, stackSize);
48     }
49 
50     void onTaskDone(PooledThreadHandler handler) {
51         _taskDoneHandler = handler;
52     }
53 
54     void stop() {
55         version(HUNT_POOL_DEBUG_MORE) {
56             infof("Stopping thread %s", this.name);
57         }
58 
59         _mutex.lock();
60         scope (exit) {
61             version(HUNT_POOL_DEBUG_MORE) {
62                 infof("thread [%s] stopped", this.name);
63             }
64             _mutex.unlock();
65         }
66         _state = PooledThreadState.Stopped;
67         _condition.notify();
68     }
69 
70     bool isBusy() {
71         return _state == PooledThreadState.Busy;
72     }
73     
74     bool isIdle() {
75         return _state == PooledThreadState.Idle;
76     }
77 
78     PooledThreadState state() {
79         return _state;
80     }
81 
82     size_t index() {
83         return _index;
84     }
85 
86     Task task() {
87         return _task;
88     }
89 
90     bool attatch(Task task) {
91         assert(task !is null);
92         version(HUNT_POOL_DEBUG_MORE) {
93             tracef("attatching task %s with thread %s (state: %s)", task.name, this.name, _state);
94         }
95 
96         bool r = cas(&_state, PooledThreadState.Idle, PooledThreadState.Busy);
97 
98         if (r) {
99             _mutex.lock();
100             scope (exit) {
101                 version(HUNT_POOL_DEBUG) {
102                     infof("task [%s] attatched with thread %s", task.name, this.name);
103                 }
104                 _mutex.unlock();
105             }
106             _task = task;
107             _condition.notify();
108             
109         } else {
110             string msg = format("Thread [%s] is unavailable. state: %s", this.name(), _state);
111             warningf(msg);
112             throw new Exception(msg);
113         }
114 
115         return r;
116     }
117 
118     private void run() nothrow {
119         while (_state != PooledThreadState.Stopped) {
120 
121             try {
122                 doRun();
123             } catch (Throwable ex) {
124                 warning(ex);
125             } 
126 
127             version (HUNT_POOL_DEBUG_MORE) {
128                 tracef("%s Done. state: %s", this.name(), _state);
129             }
130 
131             Task task = _task;
132             if(_state != PooledThreadState.Stopped) {
133                 bool r = cas(&_state, PooledThreadState.Busy, PooledThreadState.Idle);
134                 version(HUNT_POOL_DEBUG_MORE) {
135                     if(!r) {
136                         warningf("Failed to set thread %s to Idle, its state is %s", this.name, _state);
137                     }
138                 }
139             }
140 
141             version(HUNT_POOL_DEBUG_MORE) tracef("Run done: %s", this.toString());
142             _task = null;
143 
144             if(_taskDoneHandler !is null && task !is null) {
145                 try {
146                     _taskDoneHandler(this, task);
147                 } catch(Throwable t) {
148                     warning(t);
149                 }
150             }
151         }
152         
153         version (HUNT_POOL_DEBUG) {
154             tracef("Thread [%s] stopped. State: %s", this.name(), _state);
155         }
156     }
157 
158     private bool _isWaiting = false;
159 
160     private void doRun() {
161         _mutex.lock();
162         
163         Task task = _task;
164         while(task is null && _state != PooledThreadState.Stopped) {
165             bool r = _condition.wait(_timeout);
166             task = _task;
167 
168             version(HUNT_POOL_DEBUG_MORE) {
169                 if(!r && _state == PooledThreadState.Busy) {
170                     if(task is null) {
171                         warningf("No task attatched on a busy thread %s in %s, task: %s", this.name, _timeout);
172                     } else {
173                         warningf("more tests need for this status, thread %s in %s", this.name, _timeout);
174                     }
175                 }
176             }
177         }
178 
179         _mutex.unlock();
180 
181         if(task !is null) {
182             version(HUNT_POOL_DEBUG) {
183                 tracef("Try to exeucte task [%s] in thread %s. task: %s", task.name, this.name, task.status);
184             }
185             task.execute();
186         }
187     }
188     
189     override string toString() {
190         import std.format;
191         string r = format("name: %s, state: %s", name, _state);
192         Task task = _task;
193         if(task !is null) {
194             return r ~ ", task: " ~ task.toString();
195         } else {
196             return r;
197         }
198         
199     }
200 }