in flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java [514:585]
public static DataType inferWiderType(DataType lType, DataType rType) {
// Ignore nullability during data type merge
boolean nullable = lType.isNullable() || rType.isNullable();
lType = lType.notNull();
rType = rType.notNull();
DataType mergedType;
if (lType.equals(rType)) {
// identical type
mergedType = rType;
} else if (lType instanceof TimestampType && rType instanceof TimestampType) {
return DataTypes.TIMESTAMP(
Math.max(
((TimestampType) lType).getPrecision(),
((TimestampType) rType).getPrecision()));
} else if (lType instanceof ZonedTimestampType && rType instanceof ZonedTimestampType) {
return DataTypes.TIMESTAMP_TZ(
Math.max(
((ZonedTimestampType) lType).getPrecision(),
((ZonedTimestampType) rType).getPrecision()));
} else if (lType instanceof LocalZonedTimestampType
&& rType instanceof LocalZonedTimestampType) {
return DataTypes.TIMESTAMP_LTZ(
Math.max(
((LocalZonedTimestampType) lType).getPrecision(),
((LocalZonedTimestampType) rType).getPrecision()));
} else if (lType.is(DataTypeFamily.TIMESTAMP) && rType.is(DataTypeFamily.TIMESTAMP)) {
return DataTypes.TIMESTAMP(TimestampType.MAX_PRECISION);
} else if (lType.is(DataTypeFamily.INTEGER_NUMERIC)
&& rType.is(DataTypeFamily.INTEGER_NUMERIC)) {
mergedType = DataTypes.BIGINT();
} else if (lType.is(DataTypeFamily.CHARACTER_STRING)
&& rType.is(DataTypeFamily.CHARACTER_STRING)) {
mergedType = DataTypes.STRING();
} else if (lType.is(DataTypeFamily.APPROXIMATE_NUMERIC)
&& rType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) {
mergedType = DataTypes.DOUBLE();
} else if (lType instanceof DecimalType && rType instanceof DecimalType) {
// Merge two decimal types
DecimalType lhsDecimal = (DecimalType) lType;
DecimalType rhsDecimal = (DecimalType) rType;
int resultIntDigits =
Math.max(
lhsDecimal.getPrecision() - lhsDecimal.getScale(),
rhsDecimal.getPrecision() - rhsDecimal.getScale());
int resultScale = Math.max(lhsDecimal.getScale(), rhsDecimal.getScale());
Preconditions.checkArgument(
resultIntDigits + resultScale <= DecimalType.MAX_PRECISION,
String.format(
"Failed to merge %s and %s type into DECIMAL. %d precision digits required, %d available",
lType,
rType,
resultIntDigits + resultScale,
DecimalType.MAX_PRECISION));
mergedType = DataTypes.DECIMAL(resultIntDigits + resultScale, resultScale);
} else if (lType instanceof DecimalType && rType.is(DataTypeFamily.EXACT_NUMERIC)) {
// Merge decimal and int
mergedType = mergeExactNumericsIntoDecimal((DecimalType) lType, rType);
} else if (rType instanceof DecimalType && lType.is(DataTypeFamily.EXACT_NUMERIC)) {
// Merge decimal and int
mergedType = mergeExactNumericsIntoDecimal((DecimalType) rType, lType);
} else {
throw new IllegalStateException(
String.format("Incompatible types: \"%s\" and \"%s\"", lType, rType));
}
if (nullable) {
return mergedType.nullable();
} else {
return mergedType.notNull();
}
}