RawObject FUNC()

in runtime/under-io-module.cpp [632:770]


RawObject FUNC(_io, _buffered_reader_readline)(Thread* thread, Arguments args) {
  // TODO(T58490915): Investigate what thread safety guarantees Python has,
  // and add locking code as necessary.
  HandleScope scope(thread);
  Runtime* runtime = thread->runtime();
  Object self_obj(&scope, args.get(0));
  if (!runtime->isInstanceOfBufferedReader(*self_obj)) {
    return thread->raiseRequiresType(self_obj, ID(BufferedReader));
  }
  BufferedReader self(&scope, *self_obj);

  Object max_line_bytes_obj(&scope, args.get(1));
  word max_line_bytes = kMaxWord;
  if (!max_line_bytes_obj.isNoneType()) {
    // TODO(T59004416) Is there a way to push intFromIndex() towards managed?
    Object max_line_bytes_int_obj(&scope,
                                  intFromIndex(thread, max_line_bytes_obj));
    if (max_line_bytes_int_obj.isErrorException()) {
      return *max_line_bytes_int_obj;
    }
    Int max_line_bytes_int(&scope, intUnderlying(*max_line_bytes_int_obj));
    if (!max_line_bytes_int.isSmallInt() && !max_line_bytes_int.isBool()) {
      return thread->raiseWithFmt(
          LayoutId::kOverflowError,
          "cannot fit value into an index-sized integer");
    }
    max_line_bytes = max_line_bytes_int.asWord();
    if (max_line_bytes == -1) {
      max_line_bytes = kMaxWord;
    } else if (max_line_bytes < 0) {
      return thread->raiseWithFmt(LayoutId::kValueError,
                                  "read length must be non-negative or -1");
    }
  }

  word buffer_num_bytes = self.bufferNumBytes();
  word read_pos = self.readPos();
  word available = buffer_num_bytes - read_pos;
  if (available > 0) {
    MutableBytes read_buf(&scope, self.readBuf());
    word line_end = -1;
    word scan_length = available;
    if (available >= max_line_bytes) {
      scan_length = max_line_bytes;
      line_end = read_pos + max_line_bytes;
    } else {
      max_line_bytes -= available;
    }
    word newline_index = read_buf.findByte('\n', read_pos, scan_length);
    if (newline_index >= 0) {
      line_end = newline_index + 1;
    }
    if (line_end >= 0) {
      self.setReadPos(line_end);
      Bytes read_buf_bytes(&scope, *read_buf);
      return bytesSubseq(thread, read_buf_bytes, read_pos, line_end - read_pos);
    }
  }

  MutableBytes read_buf(&scope, rewindOrInitReadBuf(thread, self));
  buffer_num_bytes = self.bufferNumBytes();
  word buffer_size = self.bufferSize();

  Object raw_file(&scope, self.underlying());
  Object fill_result(&scope, NoneType::object());
  Object chunks(&scope, NoneType::object());
  word line_end = -1;
  // Outer loop in case for case where a line is longer than a single buffer. In
  // that case we will collect the pieces in the `chunks` list.
  for (;;) {
    // Fill buffer until we find a newline character or filled up the whole
    // buffer.
    do {
      word old_buffer_num_bytes = buffer_num_bytes;
      fill_result = fillBuffer(thread, raw_file, read_buf, &buffer_num_bytes);
      if (fill_result.isErrorException()) return *fill_result;
      if (!fill_result.isUnbound()) {
        if (buffer_num_bytes == 0 && chunks.isNoneType()) return *fill_result;
        line_end = buffer_num_bytes;
        break;
      }

      word scan_start = old_buffer_num_bytes;
      word scan_length = buffer_num_bytes - old_buffer_num_bytes;
      if (scan_length >= max_line_bytes) {
        scan_length = max_line_bytes;
        line_end = scan_start + max_line_bytes;
      } else {
        max_line_bytes -= buffer_num_bytes - old_buffer_num_bytes;
      }
      word newline_index = read_buf.findByte('\n', scan_start, scan_length);
      if (newline_index >= 0) {
        line_end = newline_index + 1;
        break;
      }
    } while (line_end < 0 && buffer_num_bytes < buffer_size);

    if (line_end < 0) {
      // The line is longer than the buffer: Add the current buffer to the
      // chunks list, create a fresh one and repeat scan loop.
      if (chunks.isNoneType()) {
        chunks = runtime->newList();
      }
      List list(&scope, *chunks);
      runtime->listAdd(thread, list, read_buf);

      // Create a fresh buffer and retry.
      read_buf = initReadBuf(thread, self);
      buffer_num_bytes = 0;
      continue;
    }
    break;
  }

  word length = line_end;
  if (!chunks.isNoneType()) {
    List list(&scope, *chunks);
    for (word i = 0, num_items = list.numItems(); i < num_items; i++) {
      length += MutableBytes::cast(list.at(i)).length();
    }
  }
  MutableBytes result(&scope, runtime->newMutableBytesUninitialized(length));
  word idx = 0;
  if (!chunks.isNoneType()) {
    List list(&scope, *chunks);
    Bytes chunk(&scope, Bytes::empty());
    for (word i = 0, num_items = list.numItems(); i < num_items; i++) {
      chunk = list.at(i);
      word chunk_length = chunk.length();
      result.replaceFromWithBytes(idx, *chunk, chunk_length);
      idx += chunk_length;
    }
  }
  result.replaceFromWith(idx, *read_buf, line_end);
  DCHECK(idx + line_end == length, "length mismatch");
  self.setReadPos(line_end);
  self.setBufferNumBytes(buffer_num_bytes);
  return result.becomeImmutable();
}