in src/parquet/arrow/reader.cc [1305:1387]
Status PrimitiveImpl::NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) {
try {
// Pre-allocation gives much better performance for flat columns
record_reader_->Reserve(records_to_read);
record_reader_->Reset();
while (records_to_read > 0) {
if (!record_reader_->HasMoreData()) {
break;
}
int64_t records_read = record_reader_->ReadRecords(records_to_read);
records_to_read -= records_read;
if (records_read == 0) {
NextRowGroup();
}
}
} catch (const ::parquet::ParquetException& e) {
return ::arrow::Status::IOError(e.what());
}
switch (field_->type()->id()) {
TRANSFER_CASE(BOOL, ::arrow::BooleanType, BooleanType)
TRANSFER_CASE(UINT8, ::arrow::UInt8Type, Int32Type)
TRANSFER_CASE(INT8, ::arrow::Int8Type, Int32Type)
TRANSFER_CASE(UINT16, ::arrow::UInt16Type, Int32Type)
TRANSFER_CASE(INT16, ::arrow::Int16Type, Int32Type)
TRANSFER_CASE(UINT32, ::arrow::UInt32Type, Int32Type)
TRANSFER_CASE(INT32, ::arrow::Int32Type, Int32Type)
TRANSFER_CASE(UINT64, ::arrow::UInt64Type, Int64Type)
TRANSFER_CASE(INT64, ::arrow::Int64Type, Int64Type)
TRANSFER_CASE(FLOAT, ::arrow::FloatType, FloatType)
TRANSFER_CASE(DOUBLE, ::arrow::DoubleType, DoubleType)
TRANSFER_CASE(STRING, ::arrow::StringType, ByteArrayType)
TRANSFER_CASE(BINARY, ::arrow::BinaryType, ByteArrayType)
TRANSFER_CASE(DATE32, ::arrow::Date32Type, Int32Type)
TRANSFER_CASE(DATE64, ::arrow::Date64Type, Int32Type)
TRANSFER_CASE(FIXED_SIZE_BINARY, ::arrow::FixedSizeBinaryType, FLBAType)
case ::arrow::Type::NA: {
*out = std::make_shared<::arrow::NullArray>(record_reader_->values_written());
RETURN_NOT_OK(WrapIntoListArray<Int32Type>(out));
break;
}
case ::arrow::Type::DECIMAL: {
switch (descr_->physical_type()) {
case ::parquet::Type::INT32: {
TRANSFER_DATA(::arrow::Decimal128Type, Int32Type);
} break;
case ::parquet::Type::INT64: {
TRANSFER_DATA(::arrow::Decimal128Type, Int64Type);
} break;
case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
TRANSFER_DATA(::arrow::Decimal128Type, FLBAType);
} break;
default:
return Status::Invalid(
"Physical type for decimal must be int32, int64, or fixed length binary");
}
} break;
case ::arrow::Type::TIMESTAMP: {
::arrow::TimestampType* timestamp_type =
static_cast<::arrow::TimestampType*>(field_->type().get());
switch (timestamp_type->unit()) {
case ::arrow::TimeUnit::MILLI:
case ::arrow::TimeUnit::MICRO: {
TRANSFER_DATA(::arrow::TimestampType, Int64Type);
} break;
case ::arrow::TimeUnit::NANO: {
TRANSFER_DATA(::arrow::TimestampType, Int96Type);
} break;
default:
return Status::NotImplemented("TimeUnit not supported");
}
} break;
TRANSFER_CASE(TIME32, ::arrow::Time32Type, Int32Type)
TRANSFER_CASE(TIME64, ::arrow::Time64Type, Int64Type)
default:
std::stringstream ss;
ss << "No support for reading columns of type " << field_->type()->ToString();
return Status::NotImplemented(ss.str());
}
return Status::OK();
}