in paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java [149:206]
private DataType extractFieldType(DebeziumEvent.Field field) {
switch (field.type()) {
case "array":
return DataTypes.ARRAY(DataTypes.STRING());
case "map":
case "struct":
return DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING());
case "int8":
return DataTypes.TINYINT();
case "int16":
return DataTypes.SMALLINT();
case "int32":
if (Date.SCHEMA_NAME.equals(field.name())) {
return DataTypes.DATE();
}
return DataTypes.INT();
case "int64":
if (MicroTimestamp.SCHEMA_NAME.equals(field.name())) {
return DataTypes.TIMESTAMP(6);
} else if (MicroTime.SCHEMA_NAME.equals(field.name())) {
return DataTypes.TIME(6);
}
return DataTypes.BIGINT();
case "float":
case "float32":
return DataTypes.FLOAT();
case "float64":
case "double":
return DataTypes.DOUBLE();
case "boolean":
return DataTypes.BOOLEAN();
case "string":
return DataTypes.STRING();
case "bytes":
if (decimalLogicalName().equals(field.name())) {
int precision = field.parameters().get("connect.decimal.precision").asInt();
int scale = field.parameters().get("scale").asInt();
return DataTypes.DECIMAL(precision, scale);
} else if (Bits.LOGICAL_NAME.equals(field.name())) {
String stringifyLength = field.parameters().get("length").asText();
if (StringUtils.isNullOrWhitespaceOnly(stringifyLength)) {
return DataTypes.BOOLEAN();
}
int length = Integer.parseInt(stringifyLength);
if (length == 1) {
return DataTypes.BOOLEAN();
} else {
return DataTypes.BINARY(
length == Integer.MAX_VALUE ? length / 8 : (length + 7) / 8);
}
}
// field.name() == null
return DataTypes.BYTES();
default:
// default to String type
return DataTypes.STRING();
}
}