private Object getValue()

in seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumRowConverter.java [68:226]


    private Object getValue(String fieldName, SeaTunnelDataType<?> dataType, JsonNode value)
            throws IOException {
        SqlType sqlType = dataType.getSqlType();
        if (value == null || value.isNull()) {
            return null;
        }
        switch (sqlType) {
            case BOOLEAN:
                return value.asBoolean();
            case TINYINT:
                return (byte) value.asInt();
            case SMALLINT:
                return (short) value.asInt();
            case INT:
                return value.asInt();
            case BIGINT:
                return value.asLong();
            case FLOAT:
                return value.floatValue();
            case DOUBLE:
                return value.doubleValue();
            case DECIMAL:
                if (value.isNumber()) {
                    return value.decimalValue();
                }
                if (value.isBinary() || value.isTextual()) {
                    try {
                        return new BigDecimal(
                                new BigInteger(value.binaryValue()),
                                ((DecimalType) dataType).getScale());
                    } catch (Exception e) {
                        throw new RuntimeException("Invalid bytes for Decimal field", e);
                    }
                }
                if (value.has(DECIMAL_SCALE_KEY)) {
                    return new BigDecimal(
                            new BigInteger(value.get(DECIMAL_VALUE_KEY).binaryValue()),
                            value.get(DECIMAL_SCALE_KEY).intValue());
                }
                return new BigDecimal(value.asText());
            case STRING:
                return value.asText();
            case BYTES:
                try {
                    return value.binaryValue();
                } catch (IOException e) {
                    throw new RuntimeException("Invalid bytes field", e);
                }
            case DATE:
                String dateStr = value.asText();
                if (value.canConvertToLong()) {
                    return LocalDate.ofEpochDay(Long.parseLong(dateStr));
                }
                DateTimeFormatter dateFormatter = fieldFormatterMap.get(fieldName);
                if (dateFormatter == null) {
                    dateFormatter = DateUtils.matchDateFormatter(dateStr);
                    fieldFormatterMap.put(fieldName, dateFormatter);
                }
                if (dateFormatter == null) {
                    throw new SeaTunnelJsonFormatException(
                            CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
                            String.format(
                                    "SeaTunnel can not parse this date format [%s] of field [%s]",
                                    dateStr, fieldName));
                }
                return dateFormatter.parse(dateStr).query(TemporalQueries.localDate());
            case TIME:
                String timeStr = value.asText();
                if (value.canConvertToLong()) {
                    long time = Long.parseLong(timeStr);
                    if (timeStr.length() == 8) {
                        time = TimeUnit.SECONDS.toMicros(time);
                    } else if (timeStr.length() == 11) {
                        time = TimeUnit.MILLISECONDS.toMicros(time);
                    }
                    return LocalTime.ofNanoOfDay(time);
                }

                DateTimeFormatter timeFormatter = fieldFormatterMap.get(fieldName);
                if (timeFormatter == null) {
                    timeFormatter = DateUtils.matchDateFormatter(timeStr);
                    fieldFormatterMap.put(fieldName, timeFormatter);
                }
                if (timeFormatter == null) {
                    throw new SeaTunnelJsonFormatException(
                            CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
                            String.format(
                                    "SeaTunnel can not parse this date format [%s] of field [%s]",
                                    timeStr, fieldName));
                }

                TemporalAccessor parsedTime = timeFormatter.parse(timeStr);
                return parsedTime.query(TemporalQueries.localTime());
            case TIMESTAMP:
                String timestampStr = value.asText();
                if (value.canConvertToLong()) {
                    long timestamp = Long.parseLong(value.toString());
                    if (timestampStr.length() > 16) {
                        timestamp = TimeUnit.NANOSECONDS.toMillis(timestamp);
                    } else if (timestampStr.length() > 13) {
                        timestamp = TimeUnit.MICROSECONDS.toMillis(timestamp);
                    } else if (timestampStr.length() > 10) {
                        // already in milliseconds
                    } else {
                        timestamp = TimeUnit.SECONDS.toMillis(timestamp);
                    }
                    return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneOffset.UTC);
                }

                DateTimeFormatter timestampFormatter = fieldFormatterMap.get(fieldName);
                if (timestampFormatter == null) {
                    timestampFormatter = DateUtils.matchDateFormatter(timestampStr);
                    fieldFormatterMap.put(fieldName, timestampFormatter);
                }
                if (timestampFormatter == null) {
                    throw new SeaTunnelJsonFormatException(
                            CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
                            String.format(
                                    "SeaTunnel can not parse this date format [%s] of field [%s]",
                                    timestampStr, fieldName));
                }

                TemporalAccessor parsedTimestamp = timestampFormatter.parse(timestampStr);
                LocalTime localTime = parsedTimestamp.query(TemporalQueries.localTime());
                LocalDate localDate = parsedTimestamp.query(TemporalQueries.localDate());
                return LocalDateTime.of(localDate, localTime);
            case ARRAY:
                List<Object> arrayValue = new ArrayList<>();
                for (JsonNode o : value) {
                    arrayValue.add(getValue(fieldName, ((ArrayType) dataType).getElementType(), o));
                }
                return arrayValue;
            case MAP:
                Map<Object, Object> mapValue = new LinkedHashMap<>();
                for (Iterator<Map.Entry<String, JsonNode>> it = value.fields(); it.hasNext(); ) {
                    Map.Entry<String, JsonNode> entry = it.next();
                    mapValue.put(
                            entry.getKey(),
                            getValue(null, ((MapType) dataType).getValueType(), entry.getValue()));
                }
                return mapValue;
            case ROW:
                SeaTunnelRowType rowType = (SeaTunnelRowType) dataType;
                SeaTunnelRow row = new SeaTunnelRow(rowType.getTotalFields());
                for (int i = 0; i < rowType.getTotalFields(); i++) {
                    row.setField(
                            i,
                            getValue(
                                    rowType.getFieldName(i),
                                    rowType.getFieldType(i),
                                    value.has(rowType.getFieldName(i))
                                            ? value.get(rowType.getFieldName(i))
                                            : null));
                }
                return row;
            default:
                throw new UnsupportedOperationException("Unsupported type: " + sqlType);
        }
    }