in paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java [438:516]
private static DataType fromDebeziumAvroType(Schema schema) {
LogicalType logicalType = schema.getLogicalType();
if (logicalType != null) {
if (logicalType instanceof LogicalTypes.Date) {
return DataTypes.DATE();
} else if (logicalType instanceof LogicalTypes.TimestampMillis) {
return DataTypes.TIMESTAMP_MILLIS();
} else if (logicalType instanceof LogicalTypes.TimestampMicros) {
return DataTypes.TIMESTAMP();
} else if (logicalType instanceof LogicalTypes.Decimal) {
LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType;
return DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale());
} else if (logicalType instanceof LogicalTypes.TimeMillis) {
return DataTypes.TIME(3);
} else if (logicalType instanceof LogicalTypes.TimeMicros) {
return DataTypes.TIME(6);
} else if (logicalType instanceof LogicalTypes.LocalTimestampMicros) {
return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE();
} else if (logicalType instanceof LogicalTypes.LocalTimestampMillis) {
return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3);
} else {
throw new UnsupportedOperationException(
String.format("Don't support logical avro type '%s' yet.", logicalType));
}
}
Schema.Type avroType = schema.getType();
switch (avroType) {
case BOOLEAN:
return DataTypes.BOOLEAN();
case BYTES:
case FIXED:
return DataTypes.BYTES();
case DOUBLE:
return DataTypes.DOUBLE();
case FLOAT:
return DataTypes.FLOAT();
case INT:
return DataTypes.INT();
case LONG:
return DataTypes.BIGINT();
case STRING:
return DataTypes.STRING();
case RECORD:
List<DataField> fields = new ArrayList<>();
for (Schema.Field field : schema.getFields()) {
DataType fieldType = fromDebeziumAvroType(field.schema());
fields.add(DataTypes.FIELD(field.pos(), field.name(), fieldType, field.doc()));
}
return DataTypes.ROW(fields.toArray(new DataField[0]));
case ARRAY:
Schema elementSchema = schema.getElementType();
DataType elementType = fromDebeziumAvroType(elementSchema);
return DataTypes.ARRAY(elementType);
case MAP:
DataType valueType = fromDebeziumAvroType(schema.getValueType());
return DataTypes.MAP(DataTypes.STRING(), valueType);
case UNION:
List<Schema> unionTypes = schema.getTypes();
// Check if it's a nullable type union
if (unionTypes.size() == 2
&& unionTypes.contains(Schema.create(Schema.Type.NULL))) {
Schema actualSchema =
unionTypes.stream()
.filter(s -> s.getType() != Schema.Type.NULL)
.findFirst()
.orElseThrow(
() ->
new IllegalStateException(
"Union type does not contain a non-null type"));
return fromDebeziumAvroType(actualSchema)
.copy(true); // Return nullable version of the non-null type
}
// Handle generic unions or throw an exception
throw new UnsupportedOperationException("Generic unions are not supported");
default:
throw new UnsupportedOperationException(
String.format("Don't support avro type '%s' yet.", avroType));
}
}