in flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java [607:709]
static Object coerceObject(
String timezone,
Object originalField,
DataType originalType,
DataType destinationType) {
if (originalField == null) {
return null;
}
if (destinationType instanceof BooleanType) {
return Boolean.valueOf(originalField.toString());
}
if (destinationType instanceof TinyIntType) {
return coerceToByte(originalField);
}
if (destinationType instanceof SmallIntType) {
return coerceToShort(originalField);
}
if (destinationType instanceof IntType) {
return coerceToInt(originalField);
}
if (destinationType instanceof BigIntType) {
return coerceToLong(originalField);
}
if (destinationType instanceof DecimalType) {
DecimalType decimalType = (DecimalType) destinationType;
return coerceToDecimal(
originalField, decimalType.getPrecision(), decimalType.getScale());
}
if (destinationType instanceof FloatType) {
return coerceToFloat(originalField);
}
if (destinationType instanceof DoubleType) {
return coerceToDouble(originalField);
}
if (destinationType instanceof CharType) {
return coerceToString(originalField, originalType);
}
if (destinationType instanceof VarCharType) {
return coerceToString(originalField, originalType);
}
if (destinationType instanceof BinaryType) {
return coerceToBytes(originalField);
}
if (destinationType instanceof VarBinaryType) {
return coerceToBytes(originalField);
}
if (destinationType instanceof DateType) {
try {
return coerceToLong(originalField);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
String.format("Cannot fit \"%s\" into a DATE column.", originalField));
}
}
if (destinationType.is(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
&& originalType.is(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) {
// For now, TimestampData / ZonedTimestampData / LocalZonedTimestampData has no
// difference in its internal representation, so there's no need to do any precision
// conversion.
return originalField;
}
if (destinationType.is(DataTypeRoot.TIMESTAMP_WITH_TIME_ZONE)
&& originalType.is(DataTypeRoot.TIMESTAMP_WITH_TIME_ZONE)) {
return originalField;
}
if (destinationType.is(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)
&& originalType.is(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {
return originalField;
}
if (destinationType instanceof TimestampType) {
return coerceToTimestamp(originalField, timezone);
}
if (destinationType instanceof LocalZonedTimestampType) {
return coerceToLocalZonedTimestamp(originalField, timezone);
}
if (destinationType instanceof ZonedTimestampType) {
return coerceToZonedTimestamp(originalField, timezone);
}
throw new IllegalArgumentException(
String.format(
"Column type \"%s\" doesn't support type coercion to \"%s\"",
originalType, destinationType));
}