in paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java [300:444]
private Map<String, String> extractRow(JsonNode recordRow) {
JsonNode schema =
Preconditions.checkNotNull(
root.get("schema"),
"MySqlDebeziumJsonEventParser only supports debezium JSON with schema. "
+ "Please make sure that `includeSchema` is true "
+ "in the JsonDebeziumDeserializationSchema you created");
Map<String, String> mySqlFieldTypes = new HashMap<>();
Map<String, String> fieldClassNames = new HashMap<>();
JsonNode arrayNode = schema.get("fields");
for (int i = 0; i < arrayNode.size(); i++) {
JsonNode elementNode = arrayNode.get(i);
String field = elementNode.get("field").asText();
if ("before".equals(field) || "after".equals(field)) {
JsonNode innerArrayNode = elementNode.get("fields");
for (int j = 0; j < innerArrayNode.size(); j++) {
JsonNode innerElementNode = innerArrayNode.get(j);
String fieldName = innerElementNode.get("field").asText();
String fieldType = innerElementNode.get("type").asText();
mySqlFieldTypes.put(fieldName, fieldType);
if (innerElementNode.get("name") != null) {
String className = innerElementNode.get("name").asText();
fieldClassNames.put(fieldName, className);
}
}
}
}
// the geometry, point type can not be converted to string, so we convert it to Object
// first.
Map<String, Object> jsonMap =
objectMapper.convertValue(recordRow, new TypeReference<Map<String, Object>>() {});
if (jsonMap == null) {
return new HashMap<>();
}
Map<String, String> resultMap = new HashMap<>();
for (Map.Entry<String, String> field : mySqlFieldTypes.entrySet()) {
String fieldName = field.getKey();
String mySqlType = field.getValue();
Object objectValue = jsonMap.get(fieldName);
if (objectValue == null) {
continue;
}
String className = fieldClassNames.get(fieldName);
String oldValue = objectValue.toString();
String newValue = oldValue;
// pay attention to the temporal types
// https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-temporal-types
if ("bytes".equals(mySqlType) && className == null) {
// MySQL binary, varbinary, blob
newValue = new String(Base64.getDecoder().decode(oldValue));
} else if ("bytes".equals(mySqlType) && Decimal.LOGICAL_NAME.equals(className)) {
// MySQL numeric, fixed, decimal
try {
new BigDecimal(oldValue);
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
"Invalid big decimal value "
+ oldValue
+ ". Make sure that in the `customConverterConfigs` "
+ "of the JsonDebeziumDeserializationSchema you created, set '"
+ JsonConverterConfig.DECIMAL_FORMAT_CONFIG
+ "' to 'numeric'",
e);
}
} else if (Date.SCHEMA_NAME.equals(className)) {
// MySQL date
newValue = DateTimeUtils.toLocalDate(Integer.parseInt(oldValue)).toString();
} else if (Timestamp.SCHEMA_NAME.equals(className)) {
// MySQL datetime (precision 0-3)
// display value of datetime is not affected by timezone, see
// https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and
// RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector
// for implementation
LocalDateTime localDateTime =
DateTimeUtils.toLocalDateTime(Long.parseLong(oldValue), ZoneOffset.UTC);
newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 3);
} else if (MicroTimestamp.SCHEMA_NAME.equals(className)) {
// MySQL datetime (precision 4-6)
long microseconds = Long.parseLong(oldValue);
long microsecondsPerSecond = 1_000_000;
long nanosecondsPerMicros = 1_000;
long seconds = microseconds / microsecondsPerSecond;
long nanoAdjustment = (microseconds % microsecondsPerSecond) * nanosecondsPerMicros;
// display value of datetime is not affected by timezone, see
// https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and
// RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector
// for implementation
LocalDateTime localDateTime =
Instant.ofEpochSecond(seconds, nanoAdjustment)
.atZone(ZoneOffset.UTC)
.toLocalDateTime();
newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
} else if (ZonedTimestamp.SCHEMA_NAME.equals(className)) {
// MySQL timestamp
// display value of timestamp is affected by timezone, see
// https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and
// RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector
// for implementation
LocalDateTime localDateTime =
Instant.parse(oldValue).atZone(serverTimeZone).toLocalDateTime();
newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
} else if (MicroTime.SCHEMA_NAME.equals(className)) {
long microseconds = Long.parseLong(oldValue);
long microsecondsPerSecond = 1_000_000;
long nanosecondsPerMicros = 1_000;
long seconds = microseconds / microsecondsPerSecond;
long nanoAdjustment = (microseconds % microsecondsPerSecond) * nanosecondsPerMicros;
newValue =
Instant.ofEpochSecond(seconds, nanoAdjustment)
.atZone(ZoneOffset.UTC)
.toLocalTime()
.toString();
} else if (Point.LOGICAL_NAME.equals(className)
|| Geometry.LOGICAL_NAME.equals(className)) {
JsonNode jsonNode = recordRow.get(fieldName);
try {
byte[] wkb = jsonNode.get("wkb").binaryValue();
newValue = MySqlTypeUtils.convertWkbArray(wkb);
} catch (Exception e) {
throw new IllegalArgumentException(
String.format("Failed to convert %s to geometry JSON.", jsonNode), e);
}
}
resultMap.put(fieldName, newValue);
}
// generate values of computed columns
for (ComputedColumn computedColumn : computedColumns) {
resultMap.put(
computedColumn.columnName(),
computedColumn.eval(resultMap.get(computedColumn.fieldReference())));
}
return resultMap;
}