in common/src/main/java/org/apache/comet/parquet/TypeUtil.java [119:225]
public static void checkParquetType(ColumnDescriptor descriptor, DataType sparkType) {
PrimitiveType.PrimitiveTypeName typeName = descriptor.getPrimitiveType().getPrimitiveTypeName();
LogicalTypeAnnotation logicalTypeAnnotation =
descriptor.getPrimitiveType().getLogicalTypeAnnotation();
boolean allowTypePromotion = (boolean) CometConf.COMET_SCHEMA_EVOLUTION_ENABLED().get();
if (sparkType instanceof NullType) {
return;
}
switch (typeName) {
case BOOLEAN:
if (sparkType == DataTypes.BooleanType) return;
break;
case INT32:
if (sparkType == DataTypes.IntegerType || canReadAsIntDecimal(descriptor, sparkType)) {
return;
} else if (sparkType == DataTypes.LongType
&& isUnsignedIntTypeMatched(logicalTypeAnnotation, 32)) {
// In `ParquetToSparkSchemaConverter`, we map parquet UINT32 to our LongType.
// For unsigned int32, it stores as plain signed int32 in Parquet when dictionary
// fallbacks. We read them as long values.
return;
} else if (sparkType == DataTypes.LongType && allowTypePromotion) {
// In Comet we allow schema evolution from int to long, if
// `spark.comet.schemaEvolution.enabled` is enabled.
return;
} else if (sparkType == DataTypes.ByteType || sparkType == DataTypes.ShortType) {
return;
} else if (sparkType == DataTypes.DateType) {
// TODO: use dateTimeRebaseMode from Spark side
return;
} else if (sparkType instanceof YearMonthIntervalType) {
return;
} else if (sparkType == DataTypes.DoubleType && isSpark40Plus()) {
return;
} else if (sparkType == TimestampNTZType$.MODULE$
&& isSpark40Plus()
&& logicalTypeAnnotation instanceof DateLogicalTypeAnnotation) {
return;
}
break;
case INT64:
if (sparkType == DataTypes.LongType || canReadAsLongDecimal(descriptor, sparkType)) {
return;
} else if (isLongDecimal(sparkType)
&& isUnsignedIntTypeMatched(logicalTypeAnnotation, 64)) {
// In `ParquetToSparkSchemaConverter`, we map parquet UINT64 to our Decimal(20, 0).
// For unsigned int64, it stores as plain signed int64 in Parquet when dictionary
// fallbacks. We read them as decimal values.
return;
} else if (isTimestampTypeMatched(logicalTypeAnnotation, TimeUnit.MICROS)
&& (sparkType == TimestampNTZType$.MODULE$ || sparkType == DataTypes.TimestampType)) {
validateTimestampType(logicalTypeAnnotation, sparkType);
// TODO: use dateTimeRebaseMode from Spark side
return;
} else if (isTimestampTypeMatched(logicalTypeAnnotation, TimeUnit.MILLIS)
&& (sparkType == TimestampNTZType$.MODULE$ || sparkType == DataTypes.TimestampType)) {
validateTimestampType(logicalTypeAnnotation, sparkType);
return;
}
break;
case INT96:
if (sparkType == TimestampNTZType$.MODULE$) {
if (isSpark40Plus()) return; // Spark 4.0+ supports Timestamp NTZ with INT96
convertErrorForTimestampNTZ(typeName.name());
} else if (sparkType == DataTypes.TimestampType) {
return;
}
break;
case FLOAT:
if (sparkType == DataTypes.FloatType) return;
// In Comet we allow schema evolution from float to double, if
// `spark.comet.schemaEvolution.enabled` is enabled.
if (sparkType == DataTypes.DoubleType && allowTypePromotion) return;
break;
case DOUBLE:
if (sparkType == DataTypes.DoubleType) return;
break;
case BINARY:
if (sparkType == DataTypes.StringType
|| sparkType == DataTypes.BinaryType
|| canReadAsBinaryDecimal(descriptor, sparkType)) {
return;
}
break;
case FIXED_LEN_BYTE_ARRAY:
if (canReadAsIntDecimal(descriptor, sparkType)
|| canReadAsLongDecimal(descriptor, sparkType)
|| canReadAsBinaryDecimal(descriptor, sparkType)
|| sparkType == DataTypes.BinaryType
// for uuid, since iceberg maps uuid to StringType
|| sparkType == DataTypes.StringType
&& logicalTypeAnnotation
instanceof LogicalTypeAnnotation.UUIDLogicalTypeAnnotation) {
return;
}
break;
default:
break;
}
throw new SchemaColumnConvertNotSupportedException(
Arrays.toString(descriptor.getPath()),
descriptor.getPrimitiveType().getPrimitiveTypeName().toString(),
sparkType.catalogString());
}