src/io/BufferedReader.php (209 lines of code) (raw):

<?hh /* * Copyright (c) 2017-present, Facebook, Inc. * All rights reserved. * * This source code is licensed under the MIT license found in the * LICENSE file in the hphp/hsl/ subdirectory of this source tree. * */ /* @lint-ignore-every AWAIT_IN_LOOP */ namespace HH\Lib\IO; use namespace HH\Lib\{IO, Math, OS, Str}; use namespace HH\Lib\_Private\_OS; /** Wrapper for `ReadHandle`s, with buffered line-based byte-based accessors. * * - `readLineAsync()` is similar to `fgets()` * - `readUntilAsync()` is a more general form * - `readByteAsync()` is similar to `fgetc()` */ final class BufferedReader implements IO\ReadHandle { use ReadHandleConvenienceMethodsTrait; public function __construct(private IO\ReadHandle $handle) { } public function getHandle(): IO\ReadHandle { return $this->handle; } private bool $eof = false; private string $buffer = ''; // implementing interface public function readImpl(?int $max_bytes = null): string { _OS\arg_assert( $max_bytes is null || $max_bytes > 0, "Max bytes must be null, or greater than 0", ); if ($this->eof) { return ''; } if ($this->buffer === '') { $this->buffer = $this->getHandle()->readImpl(); if ($this->buffer === '') { $this->eof = true; return ''; } } if ($max_bytes is null || $max_bytes >= Str\length($this->buffer)) { $buf = $this->buffer; $this->buffer = ''; return $buf; } $buf = $this->buffer; $this->buffer = Str\slice($buf, $max_bytes); return Str\slice($buf, 0, $max_bytes); } public async function readAllowPartialSuccessAsync( ?int $max_bytes = null, ?int $timeout_ns = null, ): Awaitable<string> { _OS\arg_assert( $max_bytes is null || $max_bytes > 0, "Max bytes must be null, or greater than 0", ); _OS\arg_assert( $timeout_ns is null || $timeout_ns > 0, "Timeout must be null, or greater than 0", ); if ($this->eof) { return ''; } if ($this->buffer === '') { await $this->fillBufferAsync(null, $timeout_ns); } // We either have a buffer, or reached EOF; either way, behavior matches // read, so just delegate return $this->readImpl($max_bytes); } /** Read until the specified suffix is seen. * * The trailing suffix is read (so won't be returned by other calls), but is not * included in the return value. * * This call returns null if the suffix is not seen, even if there is other * data. * * @see `readUntilxAsync()` if you want to throw EPIPE instead of returning null * @see `linesIterator()` if you want to iterate over all lines * @see `readLineAsync()` if you want trailing data instead of null */ public async function readUntilAsync(string $suffix): Awaitable<?string> { $buf = $this->buffer; $idx = Str\search($buf, $suffix); $suffix_len = Str\length($suffix); if ($idx !== null) { $this->buffer = Str\slice($buf, $idx + $suffix_len); return Str\slice($buf, 0, $idx); } do { // + 1 as it would have been matched in the previous iteration if it // fully fit in the chunk $offset = Math\maxva(0, Str\length($buf) - $suffix_len + 1); $chunk = await $this->handle->readAllowPartialSuccessAsync(); if ($chunk === '') { $this->buffer = $buf; return null; } $buf .= $chunk; $idx = Str\search($buf, $suffix, $offset); } while ($idx === null); $this->buffer = Str\slice($buf, $idx + $suffix_len); return Str\slice($buf, 0, $idx); } /** Read until the suffix, or raise EPIPE if the separator is not seen. * * This is similar to `readUntilAsync()`, however it raises EPIPE instead * of returning null. */ public async function readUntilxAsync(string $suffix): Awaitable<string> { $ret = await $this->readUntilAsync($suffix); if ($ret === null) { throw new OS\BrokenPipeException( OS\Errno::EPIPE, 'Marker/suffix not found before end of file', ); } return $ret; } /** Read until the platform end-of-line sequence is seen, or EOF is reached. * * On current platforms, this is always `\n`; it may have other values on other * platforms in the future, e.g. `\r\n`. * * The newline sequence is read (so won't be returned by other calls), but is not * included in the return value. * * - Returns null if the end of file is reached with no data. * - Returns a string otherwise * * Some illustrative edge cases: * - `''` is considered a 0-line input * - `'foo'` is considered a 1-line input * - `"foo\nbar"` is considered a 2-line input * - `"foo\nbar\n"` is also considered a 2-line input * * @see `linesIterator()` for an iterator * @see `readLinexAsync()` to throw EPIPE instead of returning null * @see `readUntilAsync()` for a more general form */ public async function readLineAsync(): Awaitable<?string> { try { $line = await $this->readUntilAsync("\n"); } catch (OS\ErrnoException $ex) { if ($ex->getErrno() === OS\Errno::EBADF) { // Eg foreach ($stdin->linesIterator()) when stdin is closed return null; } throw $ex; } if ($line !== null) { return $line; } $line = await $this->readAllAsync(); return $line === '' ? null : $line; } /** Read a line or throw EPIPE. * * @see `readLineAsync()` for details. */ public async function readLinexAsync(): Awaitable<string> { $line = await $this->readLineAsync(); if ($line !== null) { return $line; } throw new OS\BrokenPipeException(OS\Errno::EPIPE, 'No more lines to read.'); } /** Iterate over all lines in the file. * * Usage: * * ``` * foreach ($reader->linesIterator() await as $line) { * do_stuff($line); * } * ``` */ public function linesIterator(): AsyncIterator<string> { return new BufferedReaderLineIterator($this); } <<__Override>> // from trait public async function readFixedSizeAsync( int $size, ?int $timeout_ns = null, ): Awaitable<string> { $timer = new \HH\Lib\_Private\OptionalIncrementalTimeout( $timeout_ns, () ==> { _OS\throw_errno( OS\Errno::ETIMEDOUT, "Reached timeout before reading requested amount of data", ); }, ); while (Str\length($this->buffer) < $size && !$this->eof) { await $this->fillBufferAsync( $size - Str\length($this->buffer), $timer->getRemainingNS(), ); } if ($this->eof) { throw new OS\BrokenPipeException( OS\Errno::EPIPE, 'Reached end of file before requested size', ); } $buffer_size = Str\length($this->buffer); invariant( $buffer_size >= $size, "Should have read the requested data or reached EOF", ); if ($size === $buffer_size) { $ret = $this->buffer; $this->buffer = ''; return $ret; } $ret = Str\slice($this->buffer, 0, $size); $this->buffer = Str\slice($this->buffer, $size); return $ret; } /** Read a single byte from the handle. * * Fails with EPIPE if the handle is closed or otherwise unreadable. */ public async function readByteAsync( ?int $timeout_ns = null, ): Awaitable<string> { _OS\arg_assert( $timeout_ns is null || $timeout_ns > 0, "Timeout must be null, or greater than 0", ); if ($this->buffer === '' && !$this->eof) { await $this->fillBufferAsync(null, $timeout_ns); } if ($this->buffer === '') { _OS\throw_errno(OS\Errno::EPIPE, "Reached EOF without any more data"); } $ret = $this->buffer[0]; if ($ret === $this->buffer) { $this->buffer = ''; return $ret; } $this->buffer = Str\slice($this->buffer, 1); return $ret; } /** If we are known to have reached the end of the file. * * This function is best-effort: `true` is reliable, but `false` is more of * 'maybe'. For example, if called on an open socket with no data available, * it will return `false`; it is then possible that a future read will: * - return data if the other send sends some more * - block forever, or until timeout if set * - return the empty string if the socket closes the connection * * Additionally, helpers such as `readUntil` may fail with `EPIPE`. */ public function isEndOfFile(): bool { if ($this->eof) { return true; } if ($this->buffer !== '') { return false; } // attempt to make `while (!$handle->isEOF()) {` safe on a closed file // handle, e.g. STDIN; if we just return `$this->eof`, the caller loop // body must check for EPIPE and EBADF which is unexpected. try { // Calling the non-async (but still non-blocking) version as the async // version could wait for the other end to send data - which could lead // to both ends of a pipe/socket waiting on each other. $this->buffer = $this->handle->readImpl(); if ($this->buffer === '') { $this->eof = true; return true; } } catch (OS\BlockingIOException $_EWOULDBLOCK) { return false; } catch (OS\ErrnoException $ex) { if ($ex->getErrno() === OS\Errno::EBADF) { $this->eof = true; return true; } // ignore; they'll hit it again when they try a real read } return false; } private async function fillBufferAsync( ?int $desired_bytes, ?int $timeout_ns, ): Awaitable<void> { $chunk = await $this->getHandle() ->readAllowPartialSuccessAsync($desired_bytes, $timeout_ns); if ($chunk === '') { $this->eof = true; } $this->buffer .= $chunk; } }