1 module hunt.stream.TcpInputStream;
2 
3 import hunt.stream.Common;
4 import hunt.io.TcpStream;
5 import hunt.io.channel.Common;
6 import hunt.logging;
7 
8 import core.time;
9 import std.algorithm;
10 import std.format;
11 
12 import hunt.io.ByteBuffer;
13 import hunt.io.BufferUtils;
14 import hunt.io.SimpleQueue;
15 
16 /**
17  *
18  */
19 class TcpInputStream : InputStream {
20 
21     private Duration _timeout;
22     private TcpStream _tcp;
23     private SimpleQueue!ByteBuffer _bufferQueue;
24 
25     this(TcpStream tcp, Duration timeout = 5.seconds) {
26         assert(tcp !is null);
27         _bufferQueue = new SimpleQueue!ByteBuffer();
28         _timeout = timeout;
29         _tcp = tcp;
30         _tcp.received(&dataReceived);
31     }
32 
33     private DataHandleStatus dataReceived(ByteBuffer buffer) {
34         int size = buffer.remaining();
35         if(size == 0) {
36             warningf("Empty data");
37         } else {
38             // clone the buffer for data safe
39             ByteBuffer copy = BufferUtils.allocate(size);
40             copy.put(buffer).flip();
41             version(HUNT_NET_DEBUG) tracef("data enqueue (%s)...", copy.toString());
42             _bufferQueue.enqueue(copy);
43             
44             buffer.clear();
45             buffer.flip();
46         }
47         
48         return DataHandleStatus.Done;
49     }
50 
51     override int read(byte[] b, int off, int len) {
52         version(HUNT_NET_DEBUG) info("waitting for data....");
53         // TODO: Tasks pending completion -@zxp at 7/20/2019, 11:32:52 AM
54         // Support timeout
55         ByteBuffer buffer = _bufferQueue.dequeue(_timeout);
56         int r = buffer.remaining();
57         version(HUNT_NET_DEBUG) info("read....", buffer.toString());
58         r =  min(len, r);
59 
60         b[off .. off + r] = buffer.array[0 .. r];
61         return r;
62     }
63 
64     override int read() {
65         version(HUNT_NET_DEBUG) trace("waitting....");
66         ByteBuffer buffer = _bufferQueue.dequeue(_timeout);
67         version(HUNT_NET_DEBUG) trace("read....");
68         return buffer.get();
69     }
70 
71 }
72