in amoro-format-mixed/amoro-mixed-trino/src/main/java/io/trino/parquet/reader/ColumnReaderFactory.java [88:367]
public static ColumnReader create(
PrimitiveField field,
DateTimeZone timeZone,
AggregatedMemoryContext aggregatedMemoryContext,
boolean useBatchedColumnReaders) {
Type type = field.getType();
PrimitiveTypeName primitiveType =
field.getDescriptor().getPrimitiveType().getPrimitiveTypeName();
LogicalTypeAnnotation annotation =
field.getDescriptor().getPrimitiveType().getLogicalTypeAnnotation();
LocalMemoryContext memoryContext =
aggregatedMemoryContext.newLocalMemoryContext(ColumnReader.class.getSimpleName());
if (useBatchedColumnReaders && field.getDescriptor().getPath().length == 1) {
if (BOOLEAN.equals(type) && primitiveType == PrimitiveTypeName.BOOLEAN) {
return new FlatColumnReader<>(
field, ValueDecoders::getBooleanDecoder, BOOLEAN_ADAPTER, memoryContext);
}
if (TINYINT.equals(type) && primitiveType == INT32) {
if (isIntegerAnnotation(annotation)) {
return new FlatColumnReader<>(
field, ValueDecoders::getByteDecoder, BYTE_ADAPTER, memoryContext);
}
throw unsupportedException(type, field);
}
if (SMALLINT.equals(type) && primitiveType == INT32) {
if (isIntegerAnnotation(annotation)) {
return new FlatColumnReader<>(
field, ValueDecoders::getShortDecoder, SHORT_ADAPTER, memoryContext);
}
throw unsupportedException(type, field);
}
if (DATE.equals(type) && primitiveType == INT32) {
if (annotation == null || annotation instanceof DateLogicalTypeAnnotation) {
return new FlatColumnReader<>(
field, ValueDecoders::getIntDecoder, INT_ADAPTER, memoryContext);
}
throw unsupportedException(type, field);
}
if (type instanceof AbstractIntType && primitiveType == INT32) {
if (isIntegerAnnotation(annotation)) {
return new FlatColumnReader<>(
field, ValueDecoders::getIntDecoder, INT_ADAPTER, memoryContext);
}
throw unsupportedException(type, field);
}
if (type instanceof AbstractLongType && primitiveType == INT32) {
if (isIntegerAnnotation(annotation)) {
return new FlatColumnReader<>(
field, ValueDecoders::getIntToLongDecoder, LONG_ADAPTER, memoryContext);
}
throw unsupportedException(type, field);
}
if (type instanceof TimeType && primitiveType == INT64) {
if (annotation instanceof TimeLogicalTypeAnnotation timeAnnotation
&& timeAnnotation.getUnit() == MICROS) {
return new FlatColumnReader<>(
field, TransformingValueDecoders::getTimeMicrosDecoder, LONG_ADAPTER, memoryContext);
}
throw unsupportedException(type, field);
}
if (type instanceof AbstractLongType && primitiveType == INT64) {
if (BIGINT.equals(type) && annotation instanceof TimestampLogicalTypeAnnotation) {
return new FlatColumnReader<>(
field, ValueDecoders::getLongDecoder, LONG_ADAPTER, memoryContext);
}
if (isIntegerAnnotation(annotation)) {
return new FlatColumnReader<>(
field, ValueDecoders::getLongDecoder, LONG_ADAPTER, memoryContext);
}
throw unsupportedException(type, field);
}
if (REAL.equals(type) && primitiveType == FLOAT) {
return new FlatColumnReader<>(
field, ValueDecoders::getRealDecoder, INT_ADAPTER, memoryContext);
}
if (DOUBLE.equals(type)) {
if (primitiveType == PrimitiveTypeName.DOUBLE) {
return new FlatColumnReader<>(
field, ValueDecoders::getDoubleDecoder, LONG_ADAPTER, memoryContext);
}
if (primitiveType == FLOAT) {
return new FlatColumnReader<>(
field,
TransformingValueDecoders::getFloatToDoubleDecoder,
LONG_ADAPTER,
memoryContext);
}
}
if (type instanceof TimestampType timestampType && primitiveType == INT96) {
if (timestampType.isShort()) {
return new FlatColumnReader<>(
field,
(encoding, primitiveField) ->
getInt96ToShortTimestampDecoder(encoding, primitiveField, timeZone),
LONG_ADAPTER,
memoryContext);
}
return new FlatColumnReader<>(
field,
(encoding, primitiveField) ->
getInt96ToLongTimestampDecoder(encoding, primitiveField, timeZone),
INT96_ADAPTER,
memoryContext);
}
if (type instanceof TimestampWithTimeZoneType timestampWithTimeZoneType
&& primitiveType == INT96) {
if (timestampWithTimeZoneType.isShort()) {
return new FlatColumnReader<>(
field,
TransformingValueDecoders::getInt96ToShortTimestampWithTimeZoneDecoder,
LONG_ADAPTER,
memoryContext);
}
}
if (type instanceof TimestampType timestampType && primitiveType == INT64) {
if (!(annotation instanceof TimestampLogicalTypeAnnotation timestampAnnotation)) {
throw unsupportedException(type, field);
}
if (timestampType.isShort()) {
return switch (timestampAnnotation.getUnit()) {
case MILLIS -> new FlatColumnReader<>(
field,
TransformingValueDecoders::getInt64TimestampMillsToShortTimestampDecoder,
LONG_ADAPTER,
memoryContext);
case MICROS -> new FlatColumnReader<>(
field,
TransformingValueDecoders::getInt64TimestampMicrosToShortTimestampDecoder,
LONG_ADAPTER,
memoryContext);
case NANOS -> new FlatColumnReader<>(
field,
TransformingValueDecoders::getInt64TimestampNanosToShortTimestampDecoder,
LONG_ADAPTER,
memoryContext);
};
}
return switch (timestampAnnotation.getUnit()) {
case MILLIS -> new FlatColumnReader<>(
field,
TransformingValueDecoders::getInt64TimestampMillisToLongTimestampDecoder,
INT96_ADAPTER,
memoryContext);
case MICROS -> new FlatColumnReader<>(
field,
TransformingValueDecoders::getInt64TimestampMicrosToLongTimestampDecoder,
INT96_ADAPTER,
memoryContext);
case NANOS -> new FlatColumnReader<>(
field,
TransformingValueDecoders::getInt64TimestampNanosToLongTimestampDecoder,
INT96_ADAPTER,
memoryContext);
};
}
if (type instanceof TimestampWithTimeZoneType timestampWithTimeZoneType
&& primitiveType == INT64) {
if (!(annotation instanceof TimestampLogicalTypeAnnotation timestampAnnotation)) {
throw unsupportedException(type, field);
}
if (timestampWithTimeZoneType.isShort()) {
return switch (timestampAnnotation.getUnit()) {
case MILLIS -> new FlatColumnReader<>(
field,
TransformingValueDecoders
::getInt64TimestampMillsToShortTimestampWithTimeZoneDecoder,
LONG_ADAPTER,
memoryContext);
case MICROS -> new FlatColumnReader<>(
field,
TransformingValueDecoders
::getInt64TimestampMicrosToShortTimestampWithTimeZoneDecoder,
LONG_ADAPTER,
memoryContext);
case NANOS -> throw unsupportedException(type, field);
};
}
return switch (timestampAnnotation.getUnit()) {
case MILLIS, NANOS -> throw unsupportedException(type, field);
case MICROS -> new FlatColumnReader<>(
field,
TransformingValueDecoders::getInt64TimestampMicrosToLongTimestampWithTimeZoneDecoder,
INT96_ADAPTER,
memoryContext);
};
}
if (type instanceof DecimalType decimalType
&& decimalType.isShort()
&& (primitiveType == INT32
|| primitiveType == INT64
|| primitiveType == FIXED_LEN_BYTE_ARRAY)) {
if (annotation instanceof DecimalLogicalTypeAnnotation decimalAnnotation
&& !isDecimalRescaled(decimalAnnotation, decimalType)) {
return new FlatColumnReader<>(
field, ValueDecoders::getShortDecimalDecoder, LONG_ADAPTER, memoryContext);
}
}
if (type instanceof DecimalType decimalType
&& !decimalType.isShort()
&& (primitiveType == BINARY || primitiveType == FIXED_LEN_BYTE_ARRAY)) {
if (annotation instanceof DecimalLogicalTypeAnnotation decimalAnnotation
&& !isDecimalRescaled(decimalAnnotation, decimalType)) {
return new FlatColumnReader<>(
field, ValueDecoders::getLongDecimalDecoder, INT128_ADAPTER, memoryContext);
}
}
if (type instanceof VarcharType varcharType
&& !varcharType.isUnbounded()
&& primitiveType == BINARY) {
return new FlatColumnReader<>(
field, ValueDecoders::getBoundedVarcharBinaryDecoder, BINARY_ADAPTER, memoryContext);
}
if (type instanceof CharType && primitiveType == BINARY) {
return new FlatColumnReader<>(
field, ValueDecoders::getCharBinaryDecoder, BINARY_ADAPTER, memoryContext);
}
if (type instanceof AbstractVariableWidthType && primitiveType == BINARY) {
return new FlatColumnReader<>(
field, ValueDecoders::getBinaryDecoder, BINARY_ADAPTER, memoryContext);
}
if (UUID.equals(type) && primitiveType == FIXED_LEN_BYTE_ARRAY) {
// Iceberg 0.11.1 writes UUID as FIXED_LEN_BYTE_ARRAY without logical type annotation (see
// https://github.com/apache/iceberg/pull/2913)
// To support such files, we bet on the logical type to be UUID based on the Trino UUID type
// check.
if (annotation == null || isLogicalUuid(annotation)) {
return new FlatColumnReader<>(
field, ValueDecoders::getUuidDecoder, INT128_ADAPTER, memoryContext);
}
throw unsupportedException(type, field);
}
}
return switch (primitiveType) {
case BOOLEAN -> new BooleanColumnReader(field);
case INT32 -> createDecimalColumnReader(field).orElse(new IntColumnReader(field));
case INT64 -> {
if (annotation instanceof TimeLogicalTypeAnnotation timeAnnotation) {
if (timeAnnotation.getUnit() == MICROS) {
yield new TimeMicrosColumnReader(field);
}
throw unsupportedException(type, field);
}
if (annotation instanceof TimestampLogicalTypeAnnotation timestampAnnotation) {
if (timestampAnnotation.getUnit() == MILLIS) {
yield new Int64TimestampMillisColumnReader(field);
}
if (timestampAnnotation.getUnit() == MICROS) {
yield new TimestampMicrosColumnReader(field);
}
if (timestampAnnotation.getUnit() == NANOS) {
yield new Int64TimestampNanosColumnReader(field);
}
throw unsupportedException(type, field);
}
yield createDecimalColumnReader(field).orElse(new LongColumnReader(field));
}
case INT96 -> new TimestampColumnReader(field, timeZone);
case FLOAT -> new FloatColumnReader(field);
case DOUBLE -> new DoubleColumnReader(field);
case BINARY -> createDecimalColumnReader(field).orElse(new BinaryColumnReader(field));
case FIXED_LEN_BYTE_ARRAY -> {
Optional<PrimitiveColumnReader> decimalColumnReader = createDecimalColumnReader(field);
if (decimalColumnReader.isPresent()) {
yield decimalColumnReader.get();
}
if (isLogicalUuid(annotation)) {
yield new UuidColumnReader(field);
}
if (annotation == null) {
// Iceberg 0.11.1 writes UUID as FIXED_LEN_BYTE_ARRAY without logical type annotation
// (see https://github.com/apache/iceberg/pull/2913)
// To support such files, we bet on the type to be UUID,
// which gets verified later, when reading the column data.
yield new UuidColumnReader(field);
}
throw unsupportedException(type, field);
}
};
}