1 module hunt.stream.TcpInputStream; 2 3 import hunt.stream.Common; 4 import hunt.io.TcpStream; 5 import hunt.io.channel.Common; 6 import hunt.logging.ConsoleLogger; 7 8 import core.time; 9 import std.algorithm; 10 import std.format; 11 12 import hunt.io.ByteBuffer; 13 import hunt.io.BufferUtils; 14 import hunt.io.SimpleQueue; 15 16 /** 17 * 18 */ 19 class TcpInputStream : InputStream { 20 21 private Duration _timeout; 22 private TcpStream _tcp; 23 private SimpleQueue!ByteBuffer _bufferQueue; 24 25 this(TcpStream tcp, Duration timeout = 5.seconds) { 26 assert(tcp !is null); 27 _bufferQueue = new SimpleQueue!ByteBuffer(); 28 _timeout = timeout; 29 _tcp = tcp; 30 _tcp.received(&dataReceived); 31 } 32 33 private DataHandleStatus dataReceived(ByteBuffer buffer) { 34 int size = buffer.remaining(); 35 if(size == 0) { 36 warningf("Empty data"); 37 } else { 38 // clone the buffer for data safe 39 ByteBuffer copy = BufferUtils.allocate(size); 40 copy.put(buffer).flip(); 41 version(HUNT_NET_DEBUG) tracef("data enqueue (%s)...", copy.toString()); 42 _bufferQueue.enqueue(copy); 43 44 buffer.clear(); 45 buffer.flip(); 46 } 47 48 return DataHandleStatus.Done; 49 } 50 51 override int read(byte[] b, int off, int len) { 52 version(HUNT_NET_DEBUG) info("waitting for data...."); 53 // TODO: Tasks pending completion -@zxp at 7/20/2019, 11:32:52 AM 54 // Support timeout 55 ByteBuffer buffer = _bufferQueue.dequeue(_timeout); 56 int r = buffer.remaining(); 57 version(HUNT_NET_DEBUG) info("read....", buffer.toString()); 58 r = min(len, r); 59 60 b[off .. off + r] = buffer.array[0 .. r]; 61 return r; 62 } 63 64 override int read() { 65 version(HUNT_NET_DEBUG) trace("waitting...."); 66 ByteBuffer buffer = _bufferQueue.dequeue(_timeout); 67 version(HUNT_NET_DEBUG) trace("read...."); 68 return buffer.get(); 69 } 70 71 } 72