public static ColumnReader create()

in amoro-format-mixed/amoro-mixed-trino/src/main/java/io/trino/parquet/reader/ColumnReaderFactory.java [88:367]


  public static ColumnReader create(
      PrimitiveField field,
      DateTimeZone timeZone,
      AggregatedMemoryContext aggregatedMemoryContext,
      boolean useBatchedColumnReaders) {
    Type type = field.getType();
    PrimitiveTypeName primitiveType =
        field.getDescriptor().getPrimitiveType().getPrimitiveTypeName();
    LogicalTypeAnnotation annotation =
        field.getDescriptor().getPrimitiveType().getLogicalTypeAnnotation();
    LocalMemoryContext memoryContext =
        aggregatedMemoryContext.newLocalMemoryContext(ColumnReader.class.getSimpleName());
    if (useBatchedColumnReaders && field.getDescriptor().getPath().length == 1) {
      if (BOOLEAN.equals(type) && primitiveType == PrimitiveTypeName.BOOLEAN) {
        return new FlatColumnReader<>(
            field, ValueDecoders::getBooleanDecoder, BOOLEAN_ADAPTER, memoryContext);
      }
      if (TINYINT.equals(type) && primitiveType == INT32) {
        if (isIntegerAnnotation(annotation)) {
          return new FlatColumnReader<>(
              field, ValueDecoders::getByteDecoder, BYTE_ADAPTER, memoryContext);
        }
        throw unsupportedException(type, field);
      }
      if (SMALLINT.equals(type) && primitiveType == INT32) {
        if (isIntegerAnnotation(annotation)) {
          return new FlatColumnReader<>(
              field, ValueDecoders::getShortDecoder, SHORT_ADAPTER, memoryContext);
        }
        throw unsupportedException(type, field);
      }
      if (DATE.equals(type) && primitiveType == INT32) {
        if (annotation == null || annotation instanceof DateLogicalTypeAnnotation) {
          return new FlatColumnReader<>(
              field, ValueDecoders::getIntDecoder, INT_ADAPTER, memoryContext);
        }
        throw unsupportedException(type, field);
      }
      if (type instanceof AbstractIntType && primitiveType == INT32) {
        if (isIntegerAnnotation(annotation)) {
          return new FlatColumnReader<>(
              field, ValueDecoders::getIntDecoder, INT_ADAPTER, memoryContext);
        }
        throw unsupportedException(type, field);
      }
      if (type instanceof AbstractLongType && primitiveType == INT32) {
        if (isIntegerAnnotation(annotation)) {
          return new FlatColumnReader<>(
              field, ValueDecoders::getIntToLongDecoder, LONG_ADAPTER, memoryContext);
        }
        throw unsupportedException(type, field);
      }
      if (type instanceof TimeType && primitiveType == INT64) {
        if (annotation instanceof TimeLogicalTypeAnnotation timeAnnotation
            && timeAnnotation.getUnit() == MICROS) {
          return new FlatColumnReader<>(
              field, TransformingValueDecoders::getTimeMicrosDecoder, LONG_ADAPTER, memoryContext);
        }
        throw unsupportedException(type, field);
      }
      if (type instanceof AbstractLongType && primitiveType == INT64) {
        if (BIGINT.equals(type) && annotation instanceof TimestampLogicalTypeAnnotation) {
          return new FlatColumnReader<>(
              field, ValueDecoders::getLongDecoder, LONG_ADAPTER, memoryContext);
        }
        if (isIntegerAnnotation(annotation)) {
          return new FlatColumnReader<>(
              field, ValueDecoders::getLongDecoder, LONG_ADAPTER, memoryContext);
        }
        throw unsupportedException(type, field);
      }
      if (REAL.equals(type) && primitiveType == FLOAT) {
        return new FlatColumnReader<>(
            field, ValueDecoders::getRealDecoder, INT_ADAPTER, memoryContext);
      }
      if (DOUBLE.equals(type)) {
        if (primitiveType == PrimitiveTypeName.DOUBLE) {
          return new FlatColumnReader<>(
              field, ValueDecoders::getDoubleDecoder, LONG_ADAPTER, memoryContext);
        }
        if (primitiveType == FLOAT) {
          return new FlatColumnReader<>(
              field,
              TransformingValueDecoders::getFloatToDoubleDecoder,
              LONG_ADAPTER,
              memoryContext);
        }
      }
      if (type instanceof TimestampType timestampType && primitiveType == INT96) {
        if (timestampType.isShort()) {
          return new FlatColumnReader<>(
              field,
              (encoding, primitiveField) ->
                  getInt96ToShortTimestampDecoder(encoding, primitiveField, timeZone),
              LONG_ADAPTER,
              memoryContext);
        }
        return new FlatColumnReader<>(
            field,
            (encoding, primitiveField) ->
                getInt96ToLongTimestampDecoder(encoding, primitiveField, timeZone),
            INT96_ADAPTER,
            memoryContext);
      }
      if (type instanceof TimestampWithTimeZoneType timestampWithTimeZoneType
          && primitiveType == INT96) {
        if (timestampWithTimeZoneType.isShort()) {
          return new FlatColumnReader<>(
              field,
              TransformingValueDecoders::getInt96ToShortTimestampWithTimeZoneDecoder,
              LONG_ADAPTER,
              memoryContext);
        }
      }
      if (type instanceof TimestampType timestampType && primitiveType == INT64) {
        if (!(annotation instanceof TimestampLogicalTypeAnnotation timestampAnnotation)) {
          throw unsupportedException(type, field);
        }
        if (timestampType.isShort()) {
          return switch (timestampAnnotation.getUnit()) {
            case MILLIS -> new FlatColumnReader<>(
                field,
                TransformingValueDecoders::getInt64TimestampMillsToShortTimestampDecoder,
                LONG_ADAPTER,
                memoryContext);
            case MICROS -> new FlatColumnReader<>(
                field,
                TransformingValueDecoders::getInt64TimestampMicrosToShortTimestampDecoder,
                LONG_ADAPTER,
                memoryContext);
            case NANOS -> new FlatColumnReader<>(
                field,
                TransformingValueDecoders::getInt64TimestampNanosToShortTimestampDecoder,
                LONG_ADAPTER,
                memoryContext);
          };
        }
        return switch (timestampAnnotation.getUnit()) {
          case MILLIS -> new FlatColumnReader<>(
              field,
              TransformingValueDecoders::getInt64TimestampMillisToLongTimestampDecoder,
              INT96_ADAPTER,
              memoryContext);
          case MICROS -> new FlatColumnReader<>(
              field,
              TransformingValueDecoders::getInt64TimestampMicrosToLongTimestampDecoder,
              INT96_ADAPTER,
              memoryContext);
          case NANOS -> new FlatColumnReader<>(
              field,
              TransformingValueDecoders::getInt64TimestampNanosToLongTimestampDecoder,
              INT96_ADAPTER,
              memoryContext);
        };
      }
      if (type instanceof TimestampWithTimeZoneType timestampWithTimeZoneType
          && primitiveType == INT64) {
        if (!(annotation instanceof TimestampLogicalTypeAnnotation timestampAnnotation)) {
          throw unsupportedException(type, field);
        }
        if (timestampWithTimeZoneType.isShort()) {
          return switch (timestampAnnotation.getUnit()) {
            case MILLIS -> new FlatColumnReader<>(
                field,
                TransformingValueDecoders
                    ::getInt64TimestampMillsToShortTimestampWithTimeZoneDecoder,
                LONG_ADAPTER,
                memoryContext);
            case MICROS -> new FlatColumnReader<>(
                field,
                TransformingValueDecoders
                    ::getInt64TimestampMicrosToShortTimestampWithTimeZoneDecoder,
                LONG_ADAPTER,
                memoryContext);
            case NANOS -> throw unsupportedException(type, field);
          };
        }
        return switch (timestampAnnotation.getUnit()) {
          case MILLIS, NANOS -> throw unsupportedException(type, field);
          case MICROS -> new FlatColumnReader<>(
              field,
              TransformingValueDecoders::getInt64TimestampMicrosToLongTimestampWithTimeZoneDecoder,
              INT96_ADAPTER,
              memoryContext);
        };
      }
      if (type instanceof DecimalType decimalType
          && decimalType.isShort()
          && (primitiveType == INT32
              || primitiveType == INT64
              || primitiveType == FIXED_LEN_BYTE_ARRAY)) {
        if (annotation instanceof DecimalLogicalTypeAnnotation decimalAnnotation
            && !isDecimalRescaled(decimalAnnotation, decimalType)) {
          return new FlatColumnReader<>(
              field, ValueDecoders::getShortDecimalDecoder, LONG_ADAPTER, memoryContext);
        }
      }
      if (type instanceof DecimalType decimalType
          && !decimalType.isShort()
          && (primitiveType == BINARY || primitiveType == FIXED_LEN_BYTE_ARRAY)) {
        if (annotation instanceof DecimalLogicalTypeAnnotation decimalAnnotation
            && !isDecimalRescaled(decimalAnnotation, decimalType)) {
          return new FlatColumnReader<>(
              field, ValueDecoders::getLongDecimalDecoder, INT128_ADAPTER, memoryContext);
        }
      }
      if (type instanceof VarcharType varcharType
          && !varcharType.isUnbounded()
          && primitiveType == BINARY) {
        return new FlatColumnReader<>(
            field, ValueDecoders::getBoundedVarcharBinaryDecoder, BINARY_ADAPTER, memoryContext);
      }
      if (type instanceof CharType && primitiveType == BINARY) {
        return new FlatColumnReader<>(
            field, ValueDecoders::getCharBinaryDecoder, BINARY_ADAPTER, memoryContext);
      }
      if (type instanceof AbstractVariableWidthType && primitiveType == BINARY) {
        return new FlatColumnReader<>(
            field, ValueDecoders::getBinaryDecoder, BINARY_ADAPTER, memoryContext);
      }
      if (UUID.equals(type) && primitiveType == FIXED_LEN_BYTE_ARRAY) {
        // Iceberg 0.11.1 writes UUID as FIXED_LEN_BYTE_ARRAY without logical type annotation (see
        // https://github.com/apache/iceberg/pull/2913)
        // To support such files, we bet on the logical type to be UUID based on the Trino UUID type
        // check.
        if (annotation == null || isLogicalUuid(annotation)) {
          return new FlatColumnReader<>(
              field, ValueDecoders::getUuidDecoder, INT128_ADAPTER, memoryContext);
        }
        throw unsupportedException(type, field);
      }
    }

    return switch (primitiveType) {
      case BOOLEAN -> new BooleanColumnReader(field);
      case INT32 -> createDecimalColumnReader(field).orElse(new IntColumnReader(field));
      case INT64 -> {
        if (annotation instanceof TimeLogicalTypeAnnotation timeAnnotation) {
          if (timeAnnotation.getUnit() == MICROS) {
            yield new TimeMicrosColumnReader(field);
          }
          throw unsupportedException(type, field);
        }
        if (annotation instanceof TimestampLogicalTypeAnnotation timestampAnnotation) {
          if (timestampAnnotation.getUnit() == MILLIS) {
            yield new Int64TimestampMillisColumnReader(field);
          }
          if (timestampAnnotation.getUnit() == MICROS) {
            yield new TimestampMicrosColumnReader(field);
          }
          if (timestampAnnotation.getUnit() == NANOS) {
            yield new Int64TimestampNanosColumnReader(field);
          }
          throw unsupportedException(type, field);
        }
        yield createDecimalColumnReader(field).orElse(new LongColumnReader(field));
      }
      case INT96 -> new TimestampColumnReader(field, timeZone);
      case FLOAT -> new FloatColumnReader(field);
      case DOUBLE -> new DoubleColumnReader(field);
      case BINARY -> createDecimalColumnReader(field).orElse(new BinaryColumnReader(field));
      case FIXED_LEN_BYTE_ARRAY -> {
        Optional<PrimitiveColumnReader> decimalColumnReader = createDecimalColumnReader(field);
        if (decimalColumnReader.isPresent()) {
          yield decimalColumnReader.get();
        }
        if (isLogicalUuid(annotation)) {
          yield new UuidColumnReader(field);
        }
        if (annotation == null) {
          // Iceberg 0.11.1 writes UUID as FIXED_LEN_BYTE_ARRAY without logical type annotation
          // (see https://github.com/apache/iceberg/pull/2913)
          // To support such files, we bet on the type to be UUID,
          // which gets verified later, when reading the column data.
          yield new UuidColumnReader(field);
        }
        throw unsupportedException(type, field);
      }
    };
  }