in flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java [250:331]
private static SchemaBuilder convertCDCDataTypeToDebeziumDataType(Column column) {
org.apache.flink.cdc.common.types.DataType columnType = column.getType();
final SchemaBuilder field;
switch (columnType.getTypeRoot()) {
case TINYINT:
case SMALLINT:
field = SchemaBuilder.int16();
break;
case INTEGER:
field = SchemaBuilder.int32();
break;
case BIGINT:
field = SchemaBuilder.int64();
break;
case DECIMAL:
final int decimalPrecision = ((DecimalType) columnType).getPrecision();
final int decimalScale = ((DecimalType) columnType).getScale();
field =
Decimal.builder(decimalScale)
.parameter(
"connect.decimal.precision",
String.valueOf(decimalPrecision));
break;
case BOOLEAN:
field = SchemaBuilder.bool();
break;
case FLOAT:
field = SchemaBuilder.float32();
break;
case DOUBLE:
field = SchemaBuilder.float64();
break;
case DATE:
field = SchemaBuilder.int32().name(Date.SCHEMA_NAME).version(1);
break;
case TIME_WITHOUT_TIME_ZONE:
field = SchemaBuilder.int64().name(MicroTime.SCHEMA_NAME).version(1);
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_TIME_ZONE:
int timestampPrecisionPrecision = ((TimestampType) columnType).getPrecision();
if (timestampPrecisionPrecision > 3) {
field = SchemaBuilder.int64().name(MicroTimestamp.SCHEMA_NAME).version(1);
} else {
field = SchemaBuilder.int64().name(Timestamp.SCHEMA_NAME).version(1);
}
break;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
field = SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).version(1);
break;
case BINARY:
case VARBINARY:
field =
SchemaBuilder.bytes()
.name(Bits.LOGICAL_NAME)
.parameter(
Bits.LENGTH_FIELD,
Integer.toString(
org.apache.flink.cdc.common.types.DataTypes
.getLength(columnType)
.orElse(0)))
.version(1);
break;
case CHAR:
case VARCHAR:
default:
field = SchemaBuilder.string();
}
if (columnType.isNullable()) {
field.optional();
} else {
field.required();
}
if (column.getDefaultValueExpression() != null) {
field.defaultValue(column.getDefaultValueExpression());
}
if (column.getComment() != null) {
field.doc(column.getComment());
}
return field;
}