in cpp/src/parquet/arrow/reader_internal.cc [832:985]
Status TransferColumnData(RecordReader* reader,
std::unique_ptr<::parquet::ColumnChunkMetaData> metadata,
const std::shared_ptr<Field>& value_field,
const ColumnDescriptor* descr, const ReaderContext* ctx,
std::shared_ptr<ChunkedArray>* out) {
auto pool = ctx->pool;
Datum result;
std::shared_ptr<ChunkedArray> chunked_result;
switch (value_field->type()->id()) {
case ::arrow::Type::DICTIONARY: {
RETURN_NOT_OK(TransferDictionary(reader, pool, value_field->type(),
value_field->nullable(), &chunked_result));
result = chunked_result;
} break;
case ::arrow::Type::NA: {
result = std::make_shared<::arrow::NullArray>(reader->values_written());
break;
}
case ::arrow::Type::INT32:
result = TransferZeroCopy<::arrow::Int32Type, Int32Type>(
reader, std::move(metadata), ctx, value_field);
break;
case ::arrow::Type::INT64:
result = TransferZeroCopy<::arrow::Int64Type, Int64Type>(
reader, std::move(metadata), ctx, value_field);
break;
case ::arrow::Type::FLOAT:
result = TransferZeroCopy<::arrow::FloatType, FloatType>(
reader, std::move(metadata), ctx, value_field);
break;
case ::arrow::Type::DOUBLE:
result = TransferZeroCopy<::arrow::DoubleType, DoubleType>(
reader, std::move(metadata), ctx, value_field);
break;
case ::arrow::Type::BOOL:
RETURN_NOT_OK(TransferBool(reader, std::move(metadata), ctx,
value_field->nullable(), &result));
break;
TRANSFER_INT32(UINT8, ::arrow::UInt8Type);
TRANSFER_INT32(INT8, ::arrow::Int8Type);
TRANSFER_INT32(UINT16, ::arrow::UInt16Type);
TRANSFER_INT32(INT16, ::arrow::Int16Type);
TRANSFER_INT32(UINT32, ::arrow::UInt32Type);
TRANSFER_INT64(UINT64, ::arrow::UInt64Type);
TRANSFER_INT32(DATE32, ::arrow::Date32Type);
TRANSFER_INT32(TIME32, ::arrow::Time32Type);
TRANSFER_INT64(TIME64, ::arrow::Time64Type);
TRANSFER_INT64(DURATION, ::arrow::DurationType);
case ::arrow::Type::DATE64:
RETURN_NOT_OK(TransferDate64(reader, pool, value_field, &result));
break;
case ::arrow::Type::FIXED_SIZE_BINARY:
case ::arrow::Type::BINARY:
case ::arrow::Type::STRING:
case ::arrow::Type::LARGE_BINARY:
case ::arrow::Type::LARGE_STRING: {
RETURN_NOT_OK(TransferBinary(reader, pool, value_field, &chunked_result));
result = chunked_result;
} break;
case ::arrow::Type::HALF_FLOAT: {
const auto& type = *value_field->type();
if (descr->physical_type() != ::parquet::Type::FIXED_LEN_BYTE_ARRAY) {
return Status::Invalid("Physical type for ", type.ToString(),
" must be fixed length binary");
}
if (descr->type_length() != type.byte_width()) {
return Status::Invalid("Fixed length binary type for ", type.ToString(),
" must have a byte width of ", type.byte_width());
}
RETURN_NOT_OK(TransferHalfFloat(reader, pool, value_field, &result));
} break;
case ::arrow::Type::DECIMAL128: {
switch (descr->physical_type()) {
case ::parquet::Type::INT32: {
auto fn = DecimalIntegerTransfer<Decimal128Array, Int32Type>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::INT64: {
auto fn = &DecimalIntegerTransfer<Decimal128Array, Int64Type>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::BYTE_ARRAY: {
auto fn = &TransferDecimal<Decimal128Array, ByteArrayType>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
auto fn = &TransferDecimal<Decimal128Array, FLBAType>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
default:
return Status::Invalid(
"Physical type for decimal128 must be int32, int64, byte array, or fixed "
"length binary");
}
} break;
case ::arrow::Type::DECIMAL256:
switch (descr->physical_type()) {
case ::parquet::Type::INT32: {
auto fn = DecimalIntegerTransfer<Decimal256Array, Int32Type>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::INT64: {
auto fn = &DecimalIntegerTransfer<Decimal256Array, Int64Type>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::BYTE_ARRAY: {
auto fn = &TransferDecimal<Decimal256Array, ByteArrayType>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
auto fn = &TransferDecimal<Decimal256Array, FLBAType>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
default:
return Status::Invalid(
"Physical type for decimal256 must be int32, int64, byte array, or fixed "
"length binary");
}
break;
case ::arrow::Type::TIMESTAMP: {
const ::arrow::TimestampType& timestamp_type =
checked_cast<::arrow::TimestampType&>(*value_field->type());
if (descr->physical_type() == ::parquet::Type::INT96) {
RETURN_NOT_OK(
TransferInt96(reader, pool, value_field, &result, timestamp_type.unit()));
} else {
switch (timestamp_type.unit()) {
case ::arrow::TimeUnit::MILLI:
case ::arrow::TimeUnit::MICRO:
case ::arrow::TimeUnit::NANO:
result = TransferZeroCopy<::arrow::Int64Type, Int64Type>(
reader, std::move(metadata), ctx, value_field);
break;
default:
return Status::NotImplemented("TimeUnit not supported");
}
}
} break;
default:
return Status::NotImplemented("No support for reading columns of type ",
value_field->type()->ToString());
}
if (result.kind() == Datum::ARRAY) {
*out = std::make_shared<ChunkedArray>(result.make_array());
} else if (result.kind() == Datum::CHUNKED_ARRAY) {
*out = result.chunked_array();
} else {
DCHECK(false) << "Should be impossible, result was " << result.ToString();
}
return Status::OK();
}