in sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java [71:229]
public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor, DataType sparkType) {
PrimitiveType.PrimitiveTypeName typeName = descriptor.getPrimitiveType().getPrimitiveTypeName();
switch (typeName) {
case BOOLEAN -> {
if (sparkType == DataTypes.BooleanType) {
return new BooleanUpdater();
}
}
case INT32 -> {
if (sparkType == DataTypes.IntegerType || canReadAsIntDecimal(descriptor, sparkType)) {
return new IntegerUpdater();
} else if (sparkType == DataTypes.LongType && isUnsignedIntTypeMatched(32)) {
// In `ParquetToSparkSchemaConverter`, we map parquet UINT32 to our LongType.
// For unsigned int32, it stores as plain signed int32 in Parquet when dictionary
// fallbacks. We read them as long values.
return new UnsignedIntegerUpdater();
} else if (sparkType == DataTypes.LongType || canReadAsLongDecimal(descriptor, sparkType)) {
return new IntegerToLongUpdater();
} else if (canReadAsBinaryDecimal(descriptor, sparkType)) {
return new IntegerToBinaryUpdater();
} else if (sparkType == DataTypes.ByteType) {
return new ByteUpdater();
} else if (sparkType == DataTypes.ShortType) {
return new ShortUpdater();
} else if (sparkType == DataTypes.DoubleType) {
return new IntegerToDoubleUpdater();
} else if (sparkType == DataTypes.DateType) {
if ("CORRECTED".equals(datetimeRebaseMode)) {
return new IntegerUpdater();
} else {
boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
return new IntegerWithRebaseUpdater(failIfRebase);
}
} else if (sparkType == DataTypes.TimestampNTZType && isDateTypeMatched(descriptor)) {
if ("CORRECTED".equals(datetimeRebaseMode)) {
return new DateToTimestampNTZUpdater();
} else {
boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
return new DateToTimestampNTZWithRebaseUpdater(failIfRebase);
}
} else if (sparkType instanceof YearMonthIntervalType) {
return new IntegerUpdater();
} else if (canReadAsDecimal(descriptor, sparkType)) {
return new IntegerToDecimalUpdater(descriptor, (DecimalType) sparkType);
}
}
case INT64 -> {
// This is where we implement support for the valid type conversions.
if (sparkType == DataTypes.LongType || canReadAsLongDecimal(descriptor, sparkType)) {
if (DecimalType.is32BitDecimalType(sparkType)) {
return new DowncastLongUpdater();
} else {
return new LongUpdater();
}
} else if (canReadAsBinaryDecimal(descriptor, sparkType)) {
return new LongToBinaryUpdater();
} else if (isLongDecimal(sparkType) && isUnsignedIntTypeMatched(64)) {
// In `ParquetToSparkSchemaConverter`, we map parquet UINT64 to our Decimal(20, 0).
// For unsigned int64, it stores as plain signed int64 in Parquet when dictionary
// fallbacks. We read them as decimal values.
return new UnsignedLongUpdater();
} else if (sparkType == DataTypes.TimestampType &&
isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) {
if ("CORRECTED".equals(datetimeRebaseMode)) {
return new LongUpdater();
} else {
boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
return new LongWithRebaseUpdater(failIfRebase, datetimeRebaseTz);
}
} else if (sparkType == DataTypes.TimestampType &&
isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) {
if ("CORRECTED".equals(datetimeRebaseMode)) {
return new LongAsMicrosUpdater();
} else {
final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
return new LongAsMicrosRebaseUpdater(failIfRebase, datetimeRebaseTz);
}
} else if (sparkType == DataTypes.TimestampNTZType &&
isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) {
// TIMESTAMP_NTZ is a new data type and has no legacy files that need to do rebase.
return new LongUpdater();
} else if (sparkType == DataTypes.TimestampNTZType &&
isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) {
// TIMESTAMP_NTZ is a new data type and has no legacy files that need to do rebase.
return new LongAsMicrosUpdater();
} else if (sparkType instanceof DayTimeIntervalType) {
return new LongUpdater();
} else if (canReadAsDecimal(descriptor, sparkType)) {
return new LongToDecimalUpdater(descriptor, (DecimalType) sparkType);
} else if (sparkType instanceof TimeType) {
return new LongUpdater();
}
}
case FLOAT -> {
if (sparkType == DataTypes.FloatType) {
return new FloatUpdater();
} else if (sparkType == DataTypes.DoubleType) {
return new FloatToDoubleUpdater();
}
}
case DOUBLE -> {
if (sparkType == DataTypes.DoubleType) {
return new DoubleUpdater();
}
}
case INT96 -> {
if (sparkType == DataTypes.TimestampNTZType) {
// TimestampNTZ type does not require rebasing due to its lack of time zone context.
return new BinaryToSQLTimestampUpdater();
} else if (sparkType == DataTypes.TimestampType) {
final boolean failIfRebase = "EXCEPTION".equals(int96RebaseMode);
if (!shouldConvertTimestamps()) {
if ("CORRECTED".equals(int96RebaseMode)) {
return new BinaryToSQLTimestampUpdater();
} else {
return new BinaryToSQLTimestampRebaseUpdater(failIfRebase, int96RebaseTz);
}
} else {
if ("CORRECTED".equals(int96RebaseMode)) {
return new BinaryToSQLTimestampConvertTzUpdater(convertTz);
} else {
return new BinaryToSQLTimestampConvertTzRebaseUpdater(
failIfRebase,
convertTz,
int96RebaseTz);
}
}
}
}
case BINARY -> {
if (sparkType instanceof StringType || sparkType == DataTypes.BinaryType ||
canReadAsBinaryDecimal(descriptor, sparkType)) {
return new BinaryUpdater();
} else if (canReadAsDecimal(descriptor, sparkType)) {
return new BinaryToDecimalUpdater(descriptor, (DecimalType) sparkType);
}
}
case FIXED_LEN_BYTE_ARRAY -> {
int arrayLen = descriptor.getPrimitiveType().getTypeLength();
if (canReadAsIntDecimal(descriptor, sparkType)) {
return new FixedLenByteArrayAsIntUpdater(arrayLen);
} else if (canReadAsLongDecimal(descriptor, sparkType)) {
return new FixedLenByteArrayAsLongUpdater(arrayLen);
} else if (canReadAsBinaryDecimal(descriptor, sparkType)) {
return new FixedLenByteArrayUpdater(arrayLen);
} else if (sparkType == DataTypes.BinaryType) {
return new FixedLenByteArrayUpdater(arrayLen);
} else if (canReadAsDecimal(descriptor, sparkType)) {
return new FixedLenByteArrayToDecimalUpdater(descriptor, (DecimalType) sparkType);
}
}
default -> {}
}
// If we get here, it means the combination of Spark and Parquet type is invalid or not
// supported.
throw constructConvertNotSupportedException(descriptor, sparkType);
}