private static SchemaBuilder convertCDCDataTypeToDebeziumDataType()

in flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java [250:331]


    private static SchemaBuilder convertCDCDataTypeToDebeziumDataType(Column column) {
        org.apache.flink.cdc.common.types.DataType columnType = column.getType();
        final SchemaBuilder field;
        switch (columnType.getTypeRoot()) {
            case TINYINT:
            case SMALLINT:
                field = SchemaBuilder.int16();
                break;
            case INTEGER:
                field = SchemaBuilder.int32();
                break;
            case BIGINT:
                field = SchemaBuilder.int64();
                break;
            case DECIMAL:
                final int decimalPrecision = ((DecimalType) columnType).getPrecision();
                final int decimalScale = ((DecimalType) columnType).getScale();
                field =
                        Decimal.builder(decimalScale)
                                .parameter(
                                        "connect.decimal.precision",
                                        String.valueOf(decimalPrecision));
                break;
            case BOOLEAN:
                field = SchemaBuilder.bool();
                break;
            case FLOAT:
                field = SchemaBuilder.float32();
                break;
            case DOUBLE:
                field = SchemaBuilder.float64();
                break;
            case DATE:
                field = SchemaBuilder.int32().name(Date.SCHEMA_NAME).version(1);
                break;
            case TIME_WITHOUT_TIME_ZONE:
                field = SchemaBuilder.int64().name(MicroTime.SCHEMA_NAME).version(1);
                break;
            case TIMESTAMP_WITHOUT_TIME_ZONE:
            case TIMESTAMP_WITH_TIME_ZONE:
                int timestampPrecisionPrecision = ((TimestampType) columnType).getPrecision();
                if (timestampPrecisionPrecision > 3) {
                    field = SchemaBuilder.int64().name(MicroTimestamp.SCHEMA_NAME).version(1);
                } else {
                    field = SchemaBuilder.int64().name(Timestamp.SCHEMA_NAME).version(1);
                }
                break;
            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                field = SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).version(1);
                break;
            case BINARY:
            case VARBINARY:
                field =
                        SchemaBuilder.bytes()
                                .name(Bits.LOGICAL_NAME)
                                .parameter(
                                        Bits.LENGTH_FIELD,
                                        Integer.toString(
                                                org.apache.flink.cdc.common.types.DataTypes
                                                        .getLength(columnType)
                                                        .orElse(0)))
                                .version(1);
                break;
            case CHAR:
            case VARCHAR:
            default:
                field = SchemaBuilder.string();
        }

        if (columnType.isNullable()) {
            field.optional();
        } else {
            field.required();
        }
        if (column.getDefaultValueExpression() != null) {
            field.defaultValue(column.getDefaultValueExpression());
        }
        if (column.getComment() != null) {
            field.doc(column.getComment());
        }
        return field;
    }