static int ArrowIpcArrayStreamReaderNextHeader()

in extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c [198:260]


static int ArrowIpcArrayStreamReaderNextHeader(
    struct ArrowIpcArrayStreamReaderPrivate* private_data,
    enum ArrowIpcMessageType message_type) {
  private_data->header.size_bytes = 0;
  int64_t bytes_read = 0;

  // Read 8 bytes (continuation + header size in bytes)
  NANOARROW_RETURN_NOT_OK_WITH_ERROR(ArrowBufferReserve(&private_data->header, 8),
                                     &private_data->error);
  NANOARROW_RETURN_NOT_OK(private_data->input.read(&private_data->input,
                                                   private_data->header.data, 8,
                                                   &bytes_read, &private_data->error));
  private_data->header.size_bytes += bytes_read;

  if (bytes_read == 0) {
    // The caller might not use this error message (e.g., if the end of the stream
    // is one of the valid outcomes) but we set the error anyway in case it gets
    // propagated higher (e.g., if the stream is emtpy and there's no schema message)
    ArrowErrorSet(&private_data->error, "No data available on stream");
    return ENODATA;
  } else if (bytes_read != 8) {
    ArrowErrorSet(&private_data->error,
                  "Expected at least 8 bytes in remainder of stream");
    return EINVAL;
  }

  struct ArrowBufferView input_view;
  input_view.data.data = private_data->header.data;
  input_view.size_bytes = private_data->header.size_bytes;

  // Use PeekHeader to fill in decoder.header_size_bytes
  int result =
      ArrowIpcDecoderPeekHeader(&private_data->decoder, input_view, &private_data->error);
  if (result == ENODATA) {
    return result;
  }

  // Read the header bytes
  int64_t expected_header_bytes = private_data->decoder.header_size_bytes - 8;
  NANOARROW_RETURN_NOT_OK_WITH_ERROR(
      ArrowBufferReserve(&private_data->header, expected_header_bytes),
      &private_data->error);
  NANOARROW_RETURN_NOT_OK(
      private_data->input.read(&private_data->input, private_data->header.data + 8,
                               expected_header_bytes, &bytes_read, &private_data->error));
  private_data->header.size_bytes += bytes_read;

  // Verify + decode the header
  input_view.data.data = private_data->header.data;
  input_view.size_bytes = private_data->header.size_bytes;
  NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderVerifyHeader(&private_data->decoder, input_view,
                                                      &private_data->error));

  // Don't decode the message if it's of the wrong type (because the error message
  // is better communicated by the caller)
  if (private_data->decoder.message_type != message_type) {
    return NANOARROW_OK;
  }

  NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeHeader(&private_data->decoder, input_view,
                                                      &private_data->error));
  return NANOARROW_OK;
}