1 module hunt.stream.TcpInputStream;
2 
3 import hunt.stream.Common;
4 import hunt.io.TcpStream;
5 import hunt.logging.ConsoleLogger;
6 
7 import core.time;
8 import std.algorithm;
9 import std.format;
10 
11 import hunt.io.ByteBuffer;
12 import hunt.io.BufferUtils;
13 import hunt.io.SimpleQueue;
14 
15 /**
16  *
17  */
18 class TcpInputStream : InputStream {
19 
20     private Duration _timeout;
21     private TcpStream _tcp;
22     private SimpleQueue!ByteBuffer _bufferQueue;
23 
24     this(TcpStream tcp, Duration timeout = 5.seconds) {
25         assert(tcp !is null);
26         _bufferQueue = new SimpleQueue!ByteBuffer();
27         _timeout = timeout;
28         _tcp = tcp;
29         _tcp.received(&dataReceived);
30     }
31 
32     private void dataReceived(ByteBuffer buffer) {
33         int size = buffer.remaining();
34         if(size == 0) {
35             warningf("Empty data");
36         } else {
37             // clone the buffer for data safe
38             ByteBuffer copy = BufferUtils.allocate(size);
39             copy.put(buffer).flip();
40             version(HUNT_NET_DEBUG) tracef("data enqueue (%s)...", copy.toString());
41             _bufferQueue.enqueue(copy);
42         }
43     }
44 
45     override int read(byte[] b, int off, int len) {
46         version(HUNT_NET_DEBUG) info("waitting for data....");
47         // TODO: Tasks pending completion -@zxp at 7/20/2019, 11:32:52 AM
48         // Support timeout
49         ByteBuffer buffer = _bufferQueue.dequeue(_timeout);
50         int r = buffer.remaining();
51         version(HUNT_NET_DEBUG) info("read....", buffer.toString());
52         r =  min(len, r);
53 
54         b[off .. off + r] = buffer.array[0 .. r];
55         return r;
56     }
57 
58     override int read() {
59         version(HUNT_NET_DEBUG) trace("waitting....");
60         ByteBuffer buffer = _bufferQueue.dequeue(_timeout);
61         version(HUNT_NET_DEBUG) trace("read....");
62         return buffer.get();
63     }
64 
65 }
66