1 module hunt.stream.TcpInputStream; 2 3 import hunt.stream.Common; 4 import hunt.io.TcpStream; 5 import hunt.logging.ConsoleLogger; 6 7 import core.time; 8 import std.algorithm; 9 import std.format; 10 11 import hunt.io.ByteBuffer; 12 import hunt.io.BufferUtils; 13 import hunt.io.SimpleQueue; 14 15 /** 16 * 17 */ 18 class TcpInputStream : InputStream { 19 20 private Duration _timeout; 21 private TcpStream _tcp; 22 private SimpleQueue!ByteBuffer _bufferQueue; 23 24 this(TcpStream tcp, Duration timeout = 5.seconds) { 25 assert(tcp !is null); 26 _bufferQueue = new SimpleQueue!ByteBuffer(); 27 _timeout = timeout; 28 _tcp = tcp; 29 _tcp.received(&dataReceived); 30 } 31 32 private void dataReceived(ByteBuffer buffer) { 33 int size = buffer.remaining(); 34 if(size == 0) { 35 warningf("Empty data"); 36 } else { 37 // clone the buffer for data safe 38 ByteBuffer copy = BufferUtils.allocate(size); 39 copy.put(buffer).flip(); 40 version(HUNT_NET_DEBUG) tracef("data enqueue (%s)...", copy.toString()); 41 _bufferQueue.enqueue(copy); 42 } 43 } 44 45 override int read(byte[] b, int off, int len) { 46 version(HUNT_NET_DEBUG) info("waitting for data...."); 47 // TODO: Tasks pending completion -@zxp at 7/20/2019, 11:32:52 AM 48 // Support timeout 49 ByteBuffer buffer = _bufferQueue.dequeue(_timeout); 50 int r = buffer.remaining(); 51 version(HUNT_NET_DEBUG) info("read....", buffer.toString()); 52 r = min(len, r); 53 54 b[off .. off + r] = buffer.array[0 .. r]; 55 return r; 56 } 57 58 override int read() { 59 version(HUNT_NET_DEBUG) trace("waitting...."); 60 ByteBuffer buffer = _bufferQueue.dequeue(_timeout); 61 version(HUNT_NET_DEBUG) trace("read...."); 62 return buffer.get(); 63 } 64 65 } 66