in paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java [140:261]
public static String transformRawValue(
@Nullable String rawValue,
String debeziumType,
@Nullable String className,
TypeMapping typeMapping,
Supplier<ByteBuffer> geometryGetter,
Object origin,
ZoneId serverTimeZone) {
if (rawValue == null) {
return null;
}
String transformed = rawValue;
if (Bits.LOGICAL_NAME.equals(className)) {
// transform little-endian form to normal order
// https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-data-types
byte[] littleEndian = Base64.getDecoder().decode(rawValue);
byte[] bigEndian = new byte[littleEndian.length];
for (int i = 0; i < littleEndian.length; i++) {
bigEndian[i] = littleEndian[littleEndian.length - 1 - i];
}
if (typeMapping.containsMode(TO_STRING)) {
transformed = StringUtils.bytesToBinaryString(bigEndian);
} else {
transformed = Base64.getEncoder().encodeToString(bigEndian);
}
} else if (("bytes".equals(debeziumType) && className == null)) {
// MySQL binary, varbinary, blob
transformed = new String(Base64.getDecoder().decode(rawValue));
} else if ("bytes".equals(debeziumType) && decimalLogicalName().equals(className)) {
// MySQL numeric, fixed, decimal
try {
new BigDecimal(rawValue);
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
"Invalid big decimal value "
+ rawValue
+ ". Make sure that in the `customConverterConfigs` "
+ "of the JsonDebeziumDeserializationSchema you created, set '"
+ JsonConverterConfig.DECIMAL_FORMAT_CONFIG
+ "' to 'numeric'",
e);
}
}
// pay attention to the temporal types
// https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-temporal-types
else if (Date.SCHEMA_NAME.equals(className)) {
// MySQL date
transformed = DateTimeUtils.toLocalDate(Integer.parseInt(rawValue)).toString();
} else if (Timestamp.SCHEMA_NAME.equals(className)) {
// MySQL datetime (precision 0-3)
// display value of datetime is not affected by timezone, see
// https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and
// RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector
// for implementation
LocalDateTime localDateTime =
DateTimeUtils.toLocalDateTime(Long.parseLong(rawValue), ZoneOffset.UTC);
transformed = DateTimeUtils.formatLocalDateTime(localDateTime, 3);
} else if (MicroTimestamp.SCHEMA_NAME.equals(className)) {
// MySQL datetime (precision 4-6)
long microseconds = Long.parseLong(rawValue);
long microsecondsPerSecond = 1_000_000;
long nanosecondsPerMicros = 1_000;
long seconds = microseconds / microsecondsPerSecond;
long nanoAdjustment = (microseconds % microsecondsPerSecond) * nanosecondsPerMicros;
// display value of datetime is not affected by timezone, see
// https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and
// RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector
// for implementation
LocalDateTime localDateTime =
Instant.ofEpochSecond(seconds, nanoAdjustment)
.atZone(ZoneOffset.UTC)
.toLocalDateTime();
transformed = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
} else if (ZonedTimestamp.SCHEMA_NAME.equals(className)) {
// MySQL timestamp
// display value of timestamp is affected by timezone, see
// https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and
// RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector
// for implementation
LocalDateTime localDateTime =
Instant.parse(rawValue).atZone(serverTimeZone).toLocalDateTime();
transformed = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
} else if (MicroTime.SCHEMA_NAME.equals(className)) {
long microseconds = Long.parseLong(rawValue);
long microsecondsPerSecond = 1_000_000;
long nanosecondsPerMicros = 1_000;
long seconds = microseconds / microsecondsPerSecond;
long nanoAdjustment = (microseconds % microsecondsPerSecond) * nanosecondsPerMicros;
transformed =
Instant.ofEpochSecond(seconds, nanoAdjustment)
.atZone(ZoneOffset.UTC)
.toLocalTime()
.toString();
} else if (Point.LOGICAL_NAME.equals(className)
|| Geometry.LOGICAL_NAME.equals(className)) {
try {
transformed = MySqlTypeUtils.convertWkbArray(geometryGetter.get());
} catch (Exception e) {
throw new IllegalArgumentException(
String.format("Failed to convert %s to geometry JSON.", rawValue), e);
}
} else if ((origin instanceof GenericData.Record)
|| (origin instanceof GenericData.Array)
|| (origin instanceof Map)
|| (origin instanceof List)) {
Object convertedObject = convertAvroObjectToJsonCompatible(origin);
try {
transformed = OBJECT_MAPPER.writer().writeValueAsString(convertedObject);
} catch (JsonProcessingException e) {
throw new RuntimeException(
String.format("Failed to convert %s to JSON.", origin), e);
}
}
return transformed;
}