1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.stream.BufferedInputStream; 13 14 import hunt.stream.Common; 15 import hunt.stream.FilterInputStream; 16 import hunt.Exceptions; 17 18 version(HUNT_DEBUG) { 19 import hunt.logging; 20 } 21 22 /** 23 * A <code>BufferedInputStream</code> adds 24 * functionality to another input stream-namely, 25 * the ability to buffer the input and to 26 * support the <code>mark</code> and <code>reset</code> 27 * methods. When the <code>BufferedInputStream</code> 28 * is created, an internal buffer array is 29 * created. As bytes from the stream are read 30 * or skipped, the internal buffer is refilled 31 * as necessary from the contained input stream, 32 * many bytes at a time. The <code>mark</code> 33 * operation remembers a point in the input 34 * stream and the <code>reset</code> operation 35 * causes all the bytes read since the most 36 * recent <code>mark</code> operation to be 37 * reread before new bytes are taken from 38 * the contained input stream. 39 * 40 * @author Arthur van Hoff 41 */ 42 class BufferedInputStream : FilterInputStream { 43 44 private enum int DEFAULT_BUFFER_SIZE = 8192; 45 46 /** 47 * The maximum size of array to allocate. 48 * Some VMs reserve some header words in an array. 49 * Attempts to allocate larger arrays may result in 50 * OutOfMemoryError: Requested array size exceeds VM limit 51 */ 52 private enum int MAX_BUFFER_SIZE = int.max - 8; 53 54 /** 55 * As this class is used early during bootstrap, it's motivated to use 56 * Unsafe.compareAndSetObject instead of AtomicReferenceFieldUpdater 57 * (or VarHandles) to reduce dependencies and improve startup time. 58 */ 59 // private static final Unsafe U = Unsafe.getUnsafe(); 60 61 // private static final long BUF_OFFSET 62 // = U.objectFieldOffset(BufferedInputStream.class, "buf"); 63 64 /** 65 * The internal buffer array where the data is stored. When necessary, 66 * it may be replaced by another array of 67 * a different size. 68 */ 69 /* 70 * We null this out with a CAS on close(), which is necessary since 71 * closes can be asynchronous. We use nullness of buf[] as primary 72 * indicator that this stream is closed. (The "in" field is also 73 * nulled out on close.) 74 */ 75 protected byte[] buf; 76 77 /** 78 * The index one greater than the index of the last valid byte in 79 * the buffer. 80 * This value is always 81 * in the range <code>0</code> through <code>buf.length</code>; 82 * elements <code>buf[0]</code> through <code>buf[count-1] 83 * </code>contain buffered input data obtained 84 * from the underlying input stream. 85 */ 86 protected int count; 87 88 /** 89 * The current position in the buffer. This is the index of the next 90 * character to be read from the <code>buf</code> array. 91 * <p> 92 * This value is always in the range <code>0</code> 93 * through <code>count</code>. If it is less 94 * than <code>count</code>, then <code>buf[pos]</code> 95 * is the next byte to be supplied as input; 96 * if it is equal to <code>count</code>, then 97 * the next <code>read</code> or <code>skip</code> 98 * operation will require more bytes to be 99 * read from the contained input stream. 100 * 101 * @see java.io.BufferedInputStream#buf 102 */ 103 protected int pos; 104 105 /** 106 * The value of the <code>pos</code> field at the time the last 107 * <code>mark</code> method was called. 108 * <p> 109 * This value is always 110 * in the range <code>-1</code> through <code>pos</code>. 111 * If there is no marked position in the input 112 * stream, this field is <code>-1</code>. If 113 * there is a marked position in the input 114 * stream, then <code>buf[markpos]</code> 115 * is the first byte to be supplied as input 116 * after a <code>reset</code> operation. If 117 * <code>markpos</code> is not <code>-1</code>, 118 * then all bytes from positions <code>buf[markpos]</code> 119 * through <code>buf[pos-1]</code> must remain 120 * in the buffer array (though they may be 121 * moved to another place in the buffer array, 122 * with suitable adjustments to the values 123 * of <code>count</code>, <code>pos</code>, 124 * and <code>markpos</code>); they may not 125 * be discarded unless and until the difference 126 * between <code>pos</code> and <code>markpos</code> 127 * exceeds <code>marklimit</code>. 128 * 129 * @see java.io.BufferedInputStream#mark(int) 130 * @see java.io.BufferedInputStream#pos 131 */ 132 protected int markpos = -1; 133 134 /** 135 * The maximum read ahead allowed after a call to the 136 * <code>mark</code> method before subsequent calls to the 137 * <code>reset</code> method fail. 138 * Whenever the difference between <code>pos</code> 139 * and <code>markpos</code> exceeds <code>marklimit</code>, 140 * then the mark may be dropped by setting 141 * <code>markpos</code> to <code>-1</code>. 142 * 143 * @see java.io.BufferedInputStream#mark(int) 144 * @see java.io.BufferedInputStream#reset() 145 */ 146 protected int marklimit; 147 148 /** 149 * Creates a <code>BufferedInputStream</code> 150 * and saves its argument, the input stream 151 * <code>inputStream</code>, for later use. An internal 152 * buffer array is created and stored in <code>buf</code>. 153 * 154 * @param inputStream the underlying input stream. 155 */ 156 this(InputStream inputStream) { 157 this(inputStream, DEFAULT_BUFFER_SIZE); 158 } 159 160 /** 161 * Creates a <code>BufferedInputStream</code> 162 * with the specified buffer size, 163 * and saves its argument, the input stream 164 * <code>inputStream</code>, for later use. An internal 165 * buffer array of length <code>size</code> 166 * is created and stored in <code>buf</code>. 167 * 168 * @param inputStream the underlying input stream. 169 * @param size the buffer size. 170 * @exception IllegalArgumentException if {@code size <= 0}. 171 */ 172 this(InputStream inputStream, int size) { 173 super(inputStream); 174 if (size <= 0) { 175 throw new IllegalArgumentException("Buffer size <= 0"); 176 } 177 buf = new byte[size]; 178 } 179 180 /** 181 * Check to make sure that underlying input stream has not been 182 * nulled out due to close; if not return it; 183 */ 184 private InputStream getInIfOpen() { 185 InputStream input = inputStream; 186 if (input is null) 187 throw new IOException("Stream closed"); 188 return input; 189 } 190 191 /** 192 * Check to make sure that buffer has not been nulled out due to 193 * close; if not return it; 194 */ 195 private byte[] getBufIfOpen() { 196 byte[] buffer = buf; 197 if (buffer is null) 198 throw new IOException("Stream closed"); 199 return buffer; 200 } 201 202 /** 203 * Fills the buffer with more data, taking into account 204 * shuffling and other tricks for dealing with marks. 205 * Assumes that it is being called by a synchronized method. 206 * This method also assumes that all data has already been read in, 207 * hence pos > count. 208 */ 209 private void fill() { 210 byte[] buffer = getBufIfOpen(); 211 if (markpos < 0) 212 pos = 0; /* no mark: throw away the buffer */ 213 else if (pos >= buffer.length) { /* no room left in buffer */ 214 if (markpos > 0) { /* can throw away early part of the buffer */ 215 int sz = pos - markpos; 216 // System.arraycopy(buffer, markpos, buffer, 0, sz); 217 for(int i=0; i<sz; i++) { 218 buffer[i] = buffer[markpos+i]; 219 } 220 pos = sz; 221 markpos = 0; 222 } else if (buffer.length >= marklimit) { 223 markpos = -1; /* buffer got too big, invalidate mark */ 224 pos = 0; /* drop buffer contents */ 225 } else if (buffer.length >= MAX_BUFFER_SIZE) { 226 throw new OutOfMemoryError("Required array size too large"); 227 } else { /* grow buffer */ 228 int nsz = (pos <= MAX_BUFFER_SIZE - pos) ? 229 pos * 2 : MAX_BUFFER_SIZE; 230 if (nsz > marklimit) 231 nsz = marklimit; 232 byte[] nbuf = buffer[0..pos].dup; 233 // byte[] nbuf = new byte[nsz]; 234 // System.arraycopy(buffer, 0, nbuf, 0, pos); 235 // if (!U.compareAndSetObject(this, BUF_OFFSET, buffer, nbuf)) 236 if(this.buf !is buffer) { 237 // Can't replace buf if there was an async close. 238 // Note: This would need to be changed if fill() 239 // is ever made accessible to multiple threads. 240 // But for now, the only way CAS can fail is via close. 241 // assert buf is null; 242 throw new IOException("Stream closed"); 243 } 244 buffer = nbuf; 245 } 246 } 247 count = pos; 248 int n = getInIfOpen().read(buffer, pos, cast(int)buffer.length - pos); 249 if (n > 0) 250 count = n + pos; 251 } 252 253 /** 254 * See 255 * the general contract of the <code>read</code> 256 * method of <code>InputStream</code>. 257 * 258 * @return the next byte of data, or <code>-1</code> if the end of the 259 * stream is reached. 260 * @exception IOException if this input stream has been closed by 261 * invoking its {@link #close()} method, 262 * or an I/O error occurs. 263 * @see java.io.FilterInputStream#inputStream 264 */ 265 override int read() { 266 if (pos >= count) { 267 fill(); 268 if (pos >= count) 269 return -1; 270 } 271 return getBufIfOpen()[pos++] & 0xff; 272 } 273 274 /** 275 * Read characters into a portion of an array, reading from the underlying 276 * stream at most once if necessary. 277 */ 278 private int read1(byte[] b, int off, int len) { 279 int avail = count - pos; 280 if (avail <= 0) { 281 /* If the requested length is at least as large as the buffer, and 282 if there is no mark/reset activity, do not bother to copy the 283 bytes into the local buffer. In this way buffered streams will 284 cascade harmlessly. */ 285 if (len >= getBufIfOpen().length && markpos < 0) { 286 return getInIfOpen().read(b, off, len); 287 } 288 fill(); 289 avail = count - pos; 290 if (avail <= 0) return -1; 291 } 292 int cnt = (avail < len) ? avail : len; 293 b[off.. off+cnt] = getBufIfOpen()[pos .. pos+cnt]; 294 // System.arraycopy(getBufIfOpen(), pos, b, off, cnt); 295 pos += cnt; 296 return cnt; 297 } 298 299 /** 300 * Reads bytes from this byte-input stream into the specified byte array, 301 * starting at the given offset. 302 * 303 * <p> This method implements the general contract of the corresponding 304 * <code>{@link InputStream#read(byte[], int, int) read}</code> method of 305 * the <code>{@link InputStream}</code> class. As an additional 306 * convenience, it attempts to read as many bytes as possible by repeatedly 307 * invoking the <code>read</code> method of the underlying stream. This 308 * iterated <code>read</code> continues until one of the following 309 * conditions becomes true: <ul> 310 * 311 * <li> The specified number of bytes have been read, 312 * 313 * <li> The <code>read</code> method of the underlying stream returns 314 * <code>-1</code>, indicating end-of-file, or 315 * 316 * <li> The <code>available</code> method of the underlying stream 317 * returns zero, indicating that further input requests would block. 318 * 319 * </ul> If the first <code>read</code> on the underlying stream returns 320 * <code>-1</code> to indicate end-of-file then this method returns 321 * <code>-1</code>. Otherwise this method returns the number of bytes 322 * actually read. 323 * 324 * <p> Subclasses of this class are encouraged, but not required, to 325 * attempt to read as many bytes as possible in the same fashion. 326 * 327 * @param b destination buffer. 328 * @param off offset at which to start storing bytes. 329 * @param len maximum number of bytes to read. 330 * @return the number of bytes read, or <code>-1</code> if the end of 331 * the stream has been reached. 332 * @exception IOException if this input stream has been closed by 333 * invoking its {@link #close()} method, 334 * or an I/O error occurs. 335 */ 336 override int read(byte[] b, int off, int len) { 337 getBufIfOpen(); // Check for closed stream 338 if ((off | len | (off + len) | (b.length - (off + len))) < 0) { 339 throw new IndexOutOfBoundsException(); 340 } else if (len == 0) { 341 return 0; 342 } 343 344 int n = 0; 345 for (;;) { 346 int nread = read1(b, off + n, len - n); 347 if (nread <= 0) 348 return (n == 0) ? nread : n; 349 n += nread; 350 if (n >= len) 351 return n; 352 // if not closed but no bytes available, return 353 InputStream input = inputStream; 354 if (input !is null && input.available() <= 0) 355 return n; 356 } 357 } 358 359 /** 360 * See the general contract of the <code>skip</code> 361 * method of <code>InputStream</code>. 362 * 363 * @throws IOException if this input stream has been closed by 364 * invoking its {@link #close()} method, 365 * {@code inputStream.skip(n)} throws an IOException, 366 * or an I/O error occurs. 367 */ 368 override long skip(long n) { 369 getBufIfOpen(); // Check for closed stream 370 if (n <= 0) { 371 return 0; 372 } 373 long avail = count - pos; 374 375 if (avail <= 0) { 376 // If no mark position set then don't keep in buffer 377 if (markpos <0) 378 return getInIfOpen().skip(n); 379 380 // Fill in buffer to save bytes for reset 381 fill(); 382 avail = count - pos; 383 if (avail <= 0) 384 return 0; 385 } 386 387 long skipped = (avail < n) ? avail : n; 388 pos += skipped; 389 return skipped; 390 } 391 392 /** 393 * Returns an estimate of the number of bytes that can be read (or 394 * skipped over) from this input stream without blocking by the next 395 * invocation of a method for this input stream. The next invocation might be 396 * the same thread or another thread. A single read or skip of this 397 * many bytes will not block, but may read or skip fewer bytes. 398 * <p> 399 * This method returns the sum of the number of bytes remaining to be read in 400 * the buffer (<code>count - pos</code>) and the result of calling the 401 * {@link java.io.FilterInputStream#inputStream inputStream}.available(). 402 * 403 * @return an estimate of the number of bytes that can be read (or skipped 404 * over) from this input stream without blocking. 405 * @exception IOException if this input stream has been closed by 406 * invoking its {@link #close()} method, 407 * or an I/O error occurs. 408 */ 409 override int available() { 410 int n = count - pos; 411 int avail = getInIfOpen().available(); 412 return n > (int.max - avail) ? int.max : n + avail; 413 } 414 415 /** 416 * See the general contract of the <code>mark</code> 417 * method of <code>InputStream</code>. 418 * 419 * @param readlimit the maximum limit of bytes that can be read before 420 * the mark position becomes invalid. 421 * @see java.io.BufferedInputStream#reset() 422 */ 423 override void mark(int readlimit) { 424 marklimit = readlimit; 425 markpos = pos; 426 } 427 428 /** 429 * See the general contract of the <code>reset</code> 430 * method of <code>InputStream</code>. 431 * <p> 432 * If <code>markpos</code> is <code>-1</code> 433 * (no mark has been set or the mark has been 434 * invalidated), an <code>IOException</code> 435 * is thrown. Otherwise, <code>pos</code> is 436 * set equal to <code>markpos</code>. 437 * 438 * @exception IOException if this stream has not been marked or, 439 * if the mark has been invalidated, or the stream 440 * has been closed by invoking its {@link #close()} 441 * method, or an I/O error occurs. 442 * @see java.io.BufferedInputStream#mark(int) 443 */ 444 override void reset() { 445 getBufIfOpen(); // Cause exception if closed 446 if (markpos < 0) 447 throw new IOException("Resetting to invalid mark"); 448 pos = markpos; 449 } 450 451 override void position(int index) { 452 getBufIfOpen(); // Cause exception if closed 453 if(index < 0 || index > count + inputStream.available()) 454 throw new IOException("Out of range"); 455 456 if(index >= count) { 457 inputStream.position(index - count); 458 pos = count; 459 } else { 460 inputStream.position(0); 461 pos = index; 462 } 463 } 464 465 466 /** 467 * Tests if this input stream supports the <code>mark</code> 468 * and <code>reset</code> methods. The <code>markSupported</code> 469 * method of <code>BufferedInputStream</code> returns 470 * <code>true</code>. 471 * 472 * @return a <code>bool</code> indicating if this stream type supports 473 * the <code>mark</code> and <code>reset</code> methods. 474 * @see java.io.InputStream#mark(int) 475 * @see java.io.InputStream#reset() 476 */ 477 override bool markSupported() { 478 return true; 479 } 480 481 /** 482 * Closes this input stream and releases any system resources 483 * associated with the stream. 484 * Once the stream has been closed, further read(), available(), reset(), 485 * or skip() invocations will throw an IOException. 486 * Closing a previously closed stream has no effect. 487 * 488 * @exception IOException if an I/O error occurs. 489 */ 490 override void close() { 491 byte[] buffer; 492 while ( (buffer = buf) !is null) { 493 // if (U.compareAndSetObject(this, BUF_OFFSET, buffer, null)) 494 if(this.buf is buffer) { 495 this.buf = null; 496 InputStream input = inputStream; 497 inputStream = null; 498 if (input !is null) 499 input.close(); 500 return; 501 } 502 // Else retry in case a new buf was CASed in fill() 503 } 504 } 505 }