static Object coerceObject()

in flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java [607:709]


    static Object coerceObject(
            String timezone,
            Object originalField,
            DataType originalType,
            DataType destinationType) {
        if (originalField == null) {
            return null;
        }

        if (destinationType instanceof BooleanType) {
            return Boolean.valueOf(originalField.toString());
        }

        if (destinationType instanceof TinyIntType) {
            return coerceToByte(originalField);
        }

        if (destinationType instanceof SmallIntType) {
            return coerceToShort(originalField);
        }

        if (destinationType instanceof IntType) {
            return coerceToInt(originalField);
        }

        if (destinationType instanceof BigIntType) {
            return coerceToLong(originalField);
        }

        if (destinationType instanceof DecimalType) {
            DecimalType decimalType = (DecimalType) destinationType;
            return coerceToDecimal(
                    originalField, decimalType.getPrecision(), decimalType.getScale());
        }

        if (destinationType instanceof FloatType) {
            return coerceToFloat(originalField);
        }

        if (destinationType instanceof DoubleType) {
            return coerceToDouble(originalField);
        }

        if (destinationType instanceof CharType) {
            return coerceToString(originalField, originalType);
        }

        if (destinationType instanceof VarCharType) {
            return coerceToString(originalField, originalType);
        }

        if (destinationType instanceof BinaryType) {
            return coerceToBytes(originalField);
        }

        if (destinationType instanceof VarBinaryType) {
            return coerceToBytes(originalField);
        }

        if (destinationType instanceof DateType) {
            try {
                return coerceToLong(originalField);
            } catch (IllegalArgumentException e) {
                throw new IllegalArgumentException(
                        String.format("Cannot fit \"%s\" into a DATE column.", originalField));
            }
        }

        if (destinationType.is(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
                && originalType.is(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) {
            // For now, TimestampData / ZonedTimestampData / LocalZonedTimestampData has no
            // difference in its internal representation, so there's no need to do any precision
            // conversion.
            return originalField;
        }

        if (destinationType.is(DataTypeRoot.TIMESTAMP_WITH_TIME_ZONE)
                && originalType.is(DataTypeRoot.TIMESTAMP_WITH_TIME_ZONE)) {
            return originalField;
        }

        if (destinationType.is(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)
                && originalType.is(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {
            return originalField;
        }

        if (destinationType instanceof TimestampType) {
            return coerceToTimestamp(originalField, timezone);
        }

        if (destinationType instanceof LocalZonedTimestampType) {
            return coerceToLocalZonedTimestamp(originalField, timezone);
        }

        if (destinationType instanceof ZonedTimestampType) {
            return coerceToZonedTimestamp(originalField, timezone);
        }

        throw new IllegalArgumentException(
                String.format(
                        "Column type \"%s\" doesn't support type coercion to \"%s\"",
                        originalType, destinationType));
    }