in cpp-ch/local-engine/Storages/ch_parquet/arrow/reader_internal.cc [740:866]
Status TransferColumnData(RecordReader* reader, const std::shared_ptr<Field>& value_field,
const ColumnDescriptor* descr, MemoryPool* pool,
std::shared_ptr<ChunkedArray>* out) {
Datum result;
std::shared_ptr<ChunkedArray> chunked_result;
switch (value_field->type()->id()) {
case ::arrow::Type::DICTIONARY: {
RETURN_NOT_OK(TransferDictionary(reader, value_field->type(),
value_field->nullable(), &chunked_result));
result = chunked_result;
} break;
case ::arrow::Type::NA: {
result = std::make_shared<::arrow::NullArray>(reader->values_written());
break;
}
case ::arrow::Type::INT32:
case ::arrow::Type::INT64:
case ::arrow::Type::FLOAT:
case ::arrow::Type::DOUBLE:
case ::arrow::Type::DATE32:
result = TransferZeroCopy(reader, value_field);
break;
case ::arrow::Type::BOOL:
RETURN_NOT_OK(TransferBool(reader, value_field->nullable(), pool, &result));
break;
TRANSFER_INT32(UINT8, ::arrow::UInt8Type);
TRANSFER_INT32(INT8, ::arrow::Int8Type);
TRANSFER_INT32(UINT16, ::arrow::UInt16Type);
TRANSFER_INT32(INT16, ::arrow::Int16Type);
TRANSFER_INT32(UINT32, ::arrow::UInt32Type);
TRANSFER_INT64(UINT64, ::arrow::UInt64Type);
TRANSFER_INT32(TIME32, ::arrow::Time32Type);
TRANSFER_INT64(TIME64, ::arrow::Time64Type);
TRANSFER_INT64(DURATION, ::arrow::DurationType);
case ::arrow::Type::DATE64:
RETURN_NOT_OK(TransferDate64(reader, pool, value_field, &result));
break;
case ::arrow::Type::FIXED_SIZE_BINARY:
case ::arrow::Type::BINARY:
case ::arrow::Type::STRING:
case ::arrow::Type::LARGE_BINARY:
case ::arrow::Type::LARGE_STRING: {
RETURN_NOT_OK(TransferBinary(reader, pool, value_field, &chunked_result));
result = chunked_result;
} break;
case ::arrow::Type::DECIMAL128: {
switch (descr->physical_type()) {
case ::parquet::Type::INT32: {
auto fn = DecimalIntegerTransfer<Decimal128Array, Int32Type>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::INT64: {
auto fn = &DecimalIntegerTransfer<Decimal128Array, Int64Type>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::BYTE_ARRAY: {
auto fn = &TransferDecimal<Decimal128Array, ByteArrayType>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
auto fn = &TransferDecimal<Decimal128Array, FLBAType>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
default:
return Status::Invalid(
"Physical type for decimal128 must be int32, int64, byte array, or fixed "
"length binary");
}
} break;
case ::arrow::Type::DECIMAL256:
switch (descr->physical_type()) {
case ::parquet::Type::INT32: {
auto fn = DecimalIntegerTransfer<Decimal256Array, Int32Type>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::INT64: {
auto fn = &DecimalIntegerTransfer<Decimal256Array, Int64Type>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::BYTE_ARRAY: {
auto fn = &TransferDecimal<Decimal256Array, ByteArrayType>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
auto fn = &TransferDecimal<Decimal256Array, FLBAType>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
default:
return Status::Invalid(
"Physical type for decimal256 must be int32, int64, byte array, or fixed "
"length binary");
}
break;
case ::arrow::Type::TIMESTAMP: {
const ::arrow::TimestampType& timestamp_type =
checked_cast<::arrow::TimestampType&>(*value_field->type());
if (descr->physical_type() == ::parquet::Type::INT96) {
RETURN_NOT_OK(
TransferInt96(reader, pool, value_field, &result, timestamp_type.unit()));
} else {
switch (timestamp_type.unit()) {
case ::arrow::TimeUnit::MILLI:
case ::arrow::TimeUnit::MICRO:
case ::arrow::TimeUnit::NANO:
result = TransferZeroCopy(reader, value_field);
break;
default:
return Status::NotImplemented("TimeUnit not supported");
}
}
} break;
default:
return Status::NotImplemented("No support for reading columns of type ",
value_field->type()->ToString());
}
if (result.kind() == Datum::ARRAY) {
*out = std::make_shared<ChunkedArray>(result.make_array());
} else if (result.kind() == Datum::CHUNKED_ARRAY) {
*out = result.chunked_array();
} else {
DCHECK(false) << "Should be impossible, result was " << result.ToString();
}
return Status::OK();
}