private Map extractRow()

in paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java [230:361]


    private Map<String, String> extractRow(JsonNode recordRow) {
        if (isNull(recordRow)) {
            return new HashMap<>();
        }

        DebeziumEvent.Field schema =
                Preconditions.checkNotNull(
                        root.schema(),
                        "PostgresRecordParser only supports debezium JSON with schema. "
                                + "Please make sure that `includeSchema` is true "
                                + "in the JsonDebeziumDeserializationSchema you created");

        Map<String, DebeziumEvent.Field> fields = schema.beforeAndAfterFields();

        LinkedHashMap<String, String> resultMap = new LinkedHashMap<>();
        for (Map.Entry<String, DebeziumEvent.Field> field : fields.entrySet()) {
            String fieldName = field.getKey();
            String postgresSqlType = field.getValue().type();
            JsonNode objectValue = recordRow.get(fieldName);
            if (isNull(objectValue)) {
                continue;
            }

            String className = field.getValue().name();
            String oldValue = objectValue.asText();
            String newValue = oldValue;

            if (Bits.LOGICAL_NAME.equals(className)) {
                // transform little-endian form to normal order
                // https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-basic-types
                byte[] littleEndian = Base64.getDecoder().decode(oldValue);
                byte[] bigEndian = new byte[littleEndian.length];
                for (int i = 0; i < littleEndian.length; i++) {
                    bigEndian[i] = littleEndian[littleEndian.length - 1 - i];
                }
                if (typeMapping.containsMode(TO_STRING)) {
                    newValue = StringUtils.bytesToBinaryString(bigEndian);
                } else {
                    newValue = Base64.getEncoder().encodeToString(bigEndian);
                }
            } else if (("bytes".equals(postgresSqlType) && className == null)) {
                // binary, varbinary
                newValue = new String(Base64.getDecoder().decode(oldValue));
            } else if ("bytes".equals(postgresSqlType) && decimalLogicalName().equals(className)) {
                // numeric, decimal
                try {
                    new BigDecimal(oldValue);
                } catch (NumberFormatException e) {
                    throw new IllegalArgumentException(
                            "Invalid big decimal value "
                                    + oldValue
                                    + ". Make sure that in the `customConverterConfigs` "
                                    + "of the JsonDebeziumDeserializationSchema you created, set '"
                                    + JsonConverterConfig.DECIMAL_FORMAT_CONFIG
                                    + "' to 'numeric'",
                            e);
                }
            }
            // pay attention to the temporal types
            // https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-temporal-types
            else if (Date.SCHEMA_NAME.equals(className)) {
                // date
                newValue = DateTimeUtils.toLocalDate(Integer.parseInt(oldValue)).toString();
            } else if (Timestamp.SCHEMA_NAME.equals(className)) {
                // timestamp (precision 0-3)
                LocalDateTime localDateTime =
                        DateTimeUtils.toLocalDateTime(Long.parseLong(oldValue), ZoneOffset.UTC);
                newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 3);
            } else if (MicroTimestamp.SCHEMA_NAME.equals(className)) {
                // timestamp (precision 4-6)
                long microseconds = Long.parseLong(oldValue);
                long microsecondsPerSecond = 1_000_000;
                long nanosecondsPerMicros = 1_000;
                long seconds = microseconds / microsecondsPerSecond;
                long nanoAdjustment = (microseconds % microsecondsPerSecond) * nanosecondsPerMicros;

                LocalDateTime localDateTime =
                        Instant.ofEpochSecond(seconds, nanoAdjustment)
                                .atZone(ZoneOffset.UTC)
                                .toLocalDateTime();
                newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
            } else if (ZonedTimestamp.SCHEMA_NAME.equals(className)) {
                // timestamptz
                LocalDateTime localDateTime =
                        Instant.parse(oldValue).atZone(serverTimeZone).toLocalDateTime();
                newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
            } else if (MicroTime.SCHEMA_NAME.equals(className)) {
                long microseconds = Long.parseLong(oldValue);
                long microsecondsPerSecond = 1_000_000;
                long nanosecondsPerMicros = 1_000;
                long seconds = microseconds / microsecondsPerSecond;
                long nanoAdjustment = (microseconds % microsecondsPerSecond) * nanosecondsPerMicros;

                newValue =
                        Instant.ofEpochSecond(seconds, nanoAdjustment)
                                .atZone(ZoneOffset.UTC)
                                .toLocalTime()
                                .toString();
            } else if ("array".equals(postgresSqlType)) {
                ArrayNode arrayNode = (ArrayNode) objectValue;
                List<String> newArrayValues = new ArrayList<>();
                arrayNode
                        .elements()
                        .forEachRemaining(
                                element -> {
                                    newArrayValues.add(element.asText());
                                });
                try {
                    newValue = objectMapper.writer().writeValueAsString(newArrayValues);
                } catch (JsonProcessingException e) {
                    LOG.error("Failed to convert array to JSON.", e);
                }
            }

            resultMap.put(fieldName, newValue);
        }

        // generate values of computed columns
        for (ComputedColumn computedColumn : computedColumns) {
            resultMap.put(
                    computedColumn.columnName(),
                    computedColumn.eval(resultMap.get(computedColumn.fieldReference())));
        }

        for (CdcMetadataConverter metadataConverter : metadataConverters) {
            resultMap.put(
                    metadataConverter.columnName(),
                    metadataConverter.read(root.payload().source()));
        }

        return resultMap;
    }