public static void checkParquetType()

in common/src/main/java/org/apache/comet/parquet/TypeUtil.java [119:225]


  public static void checkParquetType(ColumnDescriptor descriptor, DataType sparkType) {
    PrimitiveType.PrimitiveTypeName typeName = descriptor.getPrimitiveType().getPrimitiveTypeName();
    LogicalTypeAnnotation logicalTypeAnnotation =
        descriptor.getPrimitiveType().getLogicalTypeAnnotation();
    boolean allowTypePromotion = (boolean) CometConf.COMET_SCHEMA_EVOLUTION_ENABLED().get();

    if (sparkType instanceof NullType) {
      return;
    }

    switch (typeName) {
      case BOOLEAN:
        if (sparkType == DataTypes.BooleanType) return;
        break;
      case INT32:
        if (sparkType == DataTypes.IntegerType || canReadAsIntDecimal(descriptor, sparkType)) {
          return;
        } else if (sparkType == DataTypes.LongType
            && isUnsignedIntTypeMatched(logicalTypeAnnotation, 32)) {
          // In `ParquetToSparkSchemaConverter`, we map parquet UINT32 to our LongType.
          // For unsigned int32, it stores as plain signed int32 in Parquet when dictionary
          // fallbacks. We read them as long values.
          return;
        } else if (sparkType == DataTypes.LongType && allowTypePromotion) {
          // In Comet we allow schema evolution from int to long, if
          // `spark.comet.schemaEvolution.enabled` is enabled.
          return;
        } else if (sparkType == DataTypes.ByteType || sparkType == DataTypes.ShortType) {
          return;
        } else if (sparkType == DataTypes.DateType) {
          // TODO: use dateTimeRebaseMode from Spark side
          return;
        } else if (sparkType instanceof YearMonthIntervalType) {
          return;
        } else if (sparkType == DataTypes.DoubleType && isSpark40Plus()) {
          return;
        } else if (sparkType == TimestampNTZType$.MODULE$
            && isSpark40Plus()
            && logicalTypeAnnotation instanceof DateLogicalTypeAnnotation) {
          return;
        }
        break;
      case INT64:
        if (sparkType == DataTypes.LongType || canReadAsLongDecimal(descriptor, sparkType)) {
          return;
        } else if (isLongDecimal(sparkType)
            && isUnsignedIntTypeMatched(logicalTypeAnnotation, 64)) {
          // In `ParquetToSparkSchemaConverter`, we map parquet UINT64 to our Decimal(20, 0).
          // For unsigned int64, it stores as plain signed int64 in Parquet when dictionary
          // fallbacks. We read them as decimal values.
          return;
        } else if (isTimestampTypeMatched(logicalTypeAnnotation, TimeUnit.MICROS)
            && (sparkType == TimestampNTZType$.MODULE$ || sparkType == DataTypes.TimestampType)) {
          validateTimestampType(logicalTypeAnnotation, sparkType);
          // TODO: use dateTimeRebaseMode from Spark side
          return;
        } else if (isTimestampTypeMatched(logicalTypeAnnotation, TimeUnit.MILLIS)
            && (sparkType == TimestampNTZType$.MODULE$ || sparkType == DataTypes.TimestampType)) {
          validateTimestampType(logicalTypeAnnotation, sparkType);
          return;
        }
        break;
      case INT96:
        if (sparkType == TimestampNTZType$.MODULE$) {
          if (isSpark40Plus()) return; // Spark 4.0+ supports Timestamp NTZ with INT96
          convertErrorForTimestampNTZ(typeName.name());
        } else if (sparkType == DataTypes.TimestampType) {
          return;
        }
        break;
      case FLOAT:
        if (sparkType == DataTypes.FloatType) return;
        // In Comet we allow schema evolution from float to double, if
        // `spark.comet.schemaEvolution.enabled` is enabled.
        if (sparkType == DataTypes.DoubleType && allowTypePromotion) return;
        break;
      case DOUBLE:
        if (sparkType == DataTypes.DoubleType) return;
        break;
      case BINARY:
        if (sparkType == DataTypes.StringType
            || sparkType == DataTypes.BinaryType
            || canReadAsBinaryDecimal(descriptor, sparkType)) {
          return;
        }
        break;
      case FIXED_LEN_BYTE_ARRAY:
        if (canReadAsIntDecimal(descriptor, sparkType)
            || canReadAsLongDecimal(descriptor, sparkType)
            || canReadAsBinaryDecimal(descriptor, sparkType)
            || sparkType == DataTypes.BinaryType
            // for uuid, since iceberg maps uuid to StringType
            || sparkType == DataTypes.StringType
                && logicalTypeAnnotation
                    instanceof LogicalTypeAnnotation.UUIDLogicalTypeAnnotation) {
          return;
        }
        break;
      default:
        break;
    }

    throw new SchemaColumnConvertNotSupportedException(
        Arrays.toString(descriptor.getPath()),
        descriptor.getPrimitiveType().getPrimitiveTypeName().toString(),
        sparkType.catalogString());
  }