public static DataType inferWiderType()

in flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java [514:585]


    public static DataType inferWiderType(DataType lType, DataType rType) {
        // Ignore nullability during data type merge
        boolean nullable = lType.isNullable() || rType.isNullable();
        lType = lType.notNull();
        rType = rType.notNull();

        DataType mergedType;
        if (lType.equals(rType)) {
            // identical type
            mergedType = rType;
        } else if (lType instanceof TimestampType && rType instanceof TimestampType) {
            return DataTypes.TIMESTAMP(
                    Math.max(
                            ((TimestampType) lType).getPrecision(),
                            ((TimestampType) rType).getPrecision()));
        } else if (lType instanceof ZonedTimestampType && rType instanceof ZonedTimestampType) {
            return DataTypes.TIMESTAMP_TZ(
                    Math.max(
                            ((ZonedTimestampType) lType).getPrecision(),
                            ((ZonedTimestampType) rType).getPrecision()));
        } else if (lType instanceof LocalZonedTimestampType
                && rType instanceof LocalZonedTimestampType) {
            return DataTypes.TIMESTAMP_LTZ(
                    Math.max(
                            ((LocalZonedTimestampType) lType).getPrecision(),
                            ((LocalZonedTimestampType) rType).getPrecision()));
        } else if (lType.is(DataTypeFamily.TIMESTAMP) && rType.is(DataTypeFamily.TIMESTAMP)) {
            return DataTypes.TIMESTAMP(TimestampType.MAX_PRECISION);
        } else if (lType.is(DataTypeFamily.INTEGER_NUMERIC)
                && rType.is(DataTypeFamily.INTEGER_NUMERIC)) {
            mergedType = DataTypes.BIGINT();
        } else if (lType.is(DataTypeFamily.CHARACTER_STRING)
                && rType.is(DataTypeFamily.CHARACTER_STRING)) {
            mergedType = DataTypes.STRING();
        } else if (lType.is(DataTypeFamily.APPROXIMATE_NUMERIC)
                && rType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) {
            mergedType = DataTypes.DOUBLE();
        } else if (lType instanceof DecimalType && rType instanceof DecimalType) {
            // Merge two decimal types
            DecimalType lhsDecimal = (DecimalType) lType;
            DecimalType rhsDecimal = (DecimalType) rType;
            int resultIntDigits =
                    Math.max(
                            lhsDecimal.getPrecision() - lhsDecimal.getScale(),
                            rhsDecimal.getPrecision() - rhsDecimal.getScale());
            int resultScale = Math.max(lhsDecimal.getScale(), rhsDecimal.getScale());
            Preconditions.checkArgument(
                    resultIntDigits + resultScale <= DecimalType.MAX_PRECISION,
                    String.format(
                            "Failed to merge %s and %s type into DECIMAL. %d precision digits required, %d available",
                            lType,
                            rType,
                            resultIntDigits + resultScale,
                            DecimalType.MAX_PRECISION));
            mergedType = DataTypes.DECIMAL(resultIntDigits + resultScale, resultScale);
        } else if (lType instanceof DecimalType && rType.is(DataTypeFamily.EXACT_NUMERIC)) {
            // Merge decimal and int
            mergedType = mergeExactNumericsIntoDecimal((DecimalType) lType, rType);
        } else if (rType instanceof DecimalType && lType.is(DataTypeFamily.EXACT_NUMERIC)) {
            // Merge decimal and int
            mergedType = mergeExactNumericsIntoDecimal((DecimalType) rType, lType);
        } else {
            throw new IllegalStateException(
                    String.format("Incompatible types: \"%s\" and \"%s\"", lType, rType));
        }

        if (nullable) {
            return mergedType.nullable();
        } else {
            return mergedType.notNull();
        }
    }