in extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c [198:260]
static int ArrowIpcArrayStreamReaderNextHeader(
struct ArrowIpcArrayStreamReaderPrivate* private_data,
enum ArrowIpcMessageType message_type) {
private_data->header.size_bytes = 0;
int64_t bytes_read = 0;
// Read 8 bytes (continuation + header size in bytes)
NANOARROW_RETURN_NOT_OK_WITH_ERROR(ArrowBufferReserve(&private_data->header, 8),
&private_data->error);
NANOARROW_RETURN_NOT_OK(private_data->input.read(&private_data->input,
private_data->header.data, 8,
&bytes_read, &private_data->error));
private_data->header.size_bytes += bytes_read;
if (bytes_read == 0) {
// The caller might not use this error message (e.g., if the end of the stream
// is one of the valid outcomes) but we set the error anyway in case it gets
// propagated higher (e.g., if the stream is emtpy and there's no schema message)
ArrowErrorSet(&private_data->error, "No data available on stream");
return ENODATA;
} else if (bytes_read != 8) {
ArrowErrorSet(&private_data->error,
"Expected at least 8 bytes in remainder of stream");
return EINVAL;
}
struct ArrowBufferView input_view;
input_view.data.data = private_data->header.data;
input_view.size_bytes = private_data->header.size_bytes;
// Use PeekHeader to fill in decoder.header_size_bytes
int result =
ArrowIpcDecoderPeekHeader(&private_data->decoder, input_view, &private_data->error);
if (result == ENODATA) {
return result;
}
// Read the header bytes
int64_t expected_header_bytes = private_data->decoder.header_size_bytes - 8;
NANOARROW_RETURN_NOT_OK_WITH_ERROR(
ArrowBufferReserve(&private_data->header, expected_header_bytes),
&private_data->error);
NANOARROW_RETURN_NOT_OK(
private_data->input.read(&private_data->input, private_data->header.data + 8,
expected_header_bytes, &bytes_read, &private_data->error));
private_data->header.size_bytes += bytes_read;
// Verify + decode the header
input_view.data.data = private_data->header.data;
input_view.size_bytes = private_data->header.size_bytes;
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderVerifyHeader(&private_data->decoder, input_view,
&private_data->error));
// Don't decode the message if it's of the wrong type (because the error message
// is better communicated by the caller)
if (private_data->decoder.message_type != message_type) {
return NANOARROW_OK;
}
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeHeader(&private_data->decoder, input_view,
&private_data->error));
return NANOARROW_OK;
}