in paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java [230:361]
private Map<String, String> extractRow(JsonNode recordRow) {
if (isNull(recordRow)) {
return new HashMap<>();
}
DebeziumEvent.Field schema =
Preconditions.checkNotNull(
root.schema(),
"PostgresRecordParser only supports debezium JSON with schema. "
+ "Please make sure that `includeSchema` is true "
+ "in the JsonDebeziumDeserializationSchema you created");
Map<String, DebeziumEvent.Field> fields = schema.beforeAndAfterFields();
LinkedHashMap<String, String> resultMap = new LinkedHashMap<>();
for (Map.Entry<String, DebeziumEvent.Field> field : fields.entrySet()) {
String fieldName = field.getKey();
String postgresSqlType = field.getValue().type();
JsonNode objectValue = recordRow.get(fieldName);
if (isNull(objectValue)) {
continue;
}
String className = field.getValue().name();
String oldValue = objectValue.asText();
String newValue = oldValue;
if (Bits.LOGICAL_NAME.equals(className)) {
// transform little-endian form to normal order
// https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-basic-types
byte[] littleEndian = Base64.getDecoder().decode(oldValue);
byte[] bigEndian = new byte[littleEndian.length];
for (int i = 0; i < littleEndian.length; i++) {
bigEndian[i] = littleEndian[littleEndian.length - 1 - i];
}
if (typeMapping.containsMode(TO_STRING)) {
newValue = StringUtils.bytesToBinaryString(bigEndian);
} else {
newValue = Base64.getEncoder().encodeToString(bigEndian);
}
} else if (("bytes".equals(postgresSqlType) && className == null)) {
// binary, varbinary
newValue = new String(Base64.getDecoder().decode(oldValue));
} else if ("bytes".equals(postgresSqlType) && decimalLogicalName().equals(className)) {
// numeric, decimal
try {
new BigDecimal(oldValue);
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
"Invalid big decimal value "
+ oldValue
+ ". Make sure that in the `customConverterConfigs` "
+ "of the JsonDebeziumDeserializationSchema you created, set '"
+ JsonConverterConfig.DECIMAL_FORMAT_CONFIG
+ "' to 'numeric'",
e);
}
}
// pay attention to the temporal types
// https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-temporal-types
else if (Date.SCHEMA_NAME.equals(className)) {
// date
newValue = DateTimeUtils.toLocalDate(Integer.parseInt(oldValue)).toString();
} else if (Timestamp.SCHEMA_NAME.equals(className)) {
// timestamp (precision 0-3)
LocalDateTime localDateTime =
DateTimeUtils.toLocalDateTime(Long.parseLong(oldValue), ZoneOffset.UTC);
newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 3);
} else if (MicroTimestamp.SCHEMA_NAME.equals(className)) {
// timestamp (precision 4-6)
long microseconds = Long.parseLong(oldValue);
long microsecondsPerSecond = 1_000_000;
long nanosecondsPerMicros = 1_000;
long seconds = microseconds / microsecondsPerSecond;
long nanoAdjustment = (microseconds % microsecondsPerSecond) * nanosecondsPerMicros;
LocalDateTime localDateTime =
Instant.ofEpochSecond(seconds, nanoAdjustment)
.atZone(ZoneOffset.UTC)
.toLocalDateTime();
newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
} else if (ZonedTimestamp.SCHEMA_NAME.equals(className)) {
// timestamptz
LocalDateTime localDateTime =
Instant.parse(oldValue).atZone(serverTimeZone).toLocalDateTime();
newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
} else if (MicroTime.SCHEMA_NAME.equals(className)) {
long microseconds = Long.parseLong(oldValue);
long microsecondsPerSecond = 1_000_000;
long nanosecondsPerMicros = 1_000;
long seconds = microseconds / microsecondsPerSecond;
long nanoAdjustment = (microseconds % microsecondsPerSecond) * nanosecondsPerMicros;
newValue =
Instant.ofEpochSecond(seconds, nanoAdjustment)
.atZone(ZoneOffset.UTC)
.toLocalTime()
.toString();
} else if ("array".equals(postgresSqlType)) {
ArrayNode arrayNode = (ArrayNode) objectValue;
List<String> newArrayValues = new ArrayList<>();
arrayNode
.elements()
.forEachRemaining(
element -> {
newArrayValues.add(element.asText());
});
try {
newValue = objectMapper.writer().writeValueAsString(newArrayValues);
} catch (JsonProcessingException e) {
LOG.error("Failed to convert array to JSON.", e);
}
}
resultMap.put(fieldName, newValue);
}
// generate values of computed columns
for (ComputedColumn computedColumn : computedColumns) {
resultMap.put(
computedColumn.columnName(),
computedColumn.eval(resultMap.get(computedColumn.fieldReference())));
}
for (CdcMetadataConverter metadataConverter : metadataConverters) {
resultMap.put(
metadataConverter.columnName(),
metadataConverter.read(root.payload().source()));
}
return resultMap;
}