in paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java [165:224]
public static ConvertAction canConvert(
DataType oldType, DataType newType, TypeMapping typeMapping) {
if (oldType.equalsIgnoreNullable(newType)) {
return ConvertAction.CONVERT;
}
int oldIdx = STRING_TYPES.indexOf(oldType.getTypeRoot());
int newIdx = STRING_TYPES.indexOf(newType.getTypeRoot());
if (oldIdx >= 0 && newIdx >= 0) {
return DataTypeChecks.getLength(oldType) <= DataTypeChecks.getLength(newType)
? ConvertAction.CONVERT
: ConvertAction.IGNORE;
}
// object can always be converted to string
if ((oldIdx < 0 && newIdx >= 0)
&& typeMapping.containsMode(
TypeMapping.TypeMappingMode.ALLOW_NON_STRING_TO_STRING)) {
return ConvertAction.CONVERT;
}
oldIdx = BINARY_TYPES.indexOf(oldType.getTypeRoot());
newIdx = BINARY_TYPES.indexOf(newType.getTypeRoot());
if (oldIdx >= 0 && newIdx >= 0) {
return DataTypeChecks.getLength(oldType) <= DataTypeChecks.getLength(newType)
? ConvertAction.CONVERT
: ConvertAction.IGNORE;
}
oldIdx = INTEGER_TYPES.indexOf(oldType.getTypeRoot());
newIdx = INTEGER_TYPES.indexOf(newType.getTypeRoot());
if (oldIdx >= 0 && newIdx >= 0) {
return oldIdx <= newIdx ? ConvertAction.CONVERT : ConvertAction.IGNORE;
}
oldIdx = FLOATING_POINT_TYPES.indexOf(oldType.getTypeRoot());
newIdx = FLOATING_POINT_TYPES.indexOf(newType.getTypeRoot());
if (oldIdx >= 0 && newIdx >= 0) {
return oldIdx <= newIdx ? ConvertAction.CONVERT : ConvertAction.IGNORE;
}
oldIdx = DECIMAL_TYPES.indexOf(oldType.getTypeRoot());
newIdx = DECIMAL_TYPES.indexOf(newType.getTypeRoot());
if (oldIdx >= 0 && newIdx >= 0) {
return DataTypeChecks.getPrecision(newType) <= DataTypeChecks.getPrecision(oldType)
&& DataTypeChecks.getScale(newType) <= DataTypeChecks.getScale(oldType)
? ConvertAction.IGNORE
: ConvertAction.CONVERT;
}
oldIdx = TIMESTAMP_TYPES.indexOf(oldType.getTypeRoot());
newIdx = TIMESTAMP_TYPES.indexOf(newType.getTypeRoot());
if (oldIdx >= 0 && newIdx >= 0) {
return DataTypeChecks.getPrecision(oldType) <= DataTypeChecks.getPrecision(newType)
? ConvertAction.CONVERT
: ConvertAction.IGNORE;
}
return ConvertAction.EXCEPTION;
}