in flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java [159:194]
private void formatSpecialFieldData(JsonNode logData) {
logData.fieldNames()
.forEachRemaining(
fieldName -> {
JsonNode fieldNode = logData.get(fieldName);
if (fieldNode.isObject() && fieldNode.size() == 1) {
String fieldKey = fieldNode.fieldNames().next();
if (specialFields.contains(fieldKey)) {
switch (fieldKey) {
case DATE_FIELD:
case TIMESTAMP_FIELD:
JsonNode jsonNode = fieldNode.get(fieldKey);
long timestamp =
fieldKey.equals(TIMESTAMP_FIELD)
? jsonNode.get("t").asLong() * 1000L
: jsonNode.asLong();
String formattedDate =
MongoDateConverter.convertTimestampToString(
timestamp);
((ObjectNode) logData).put(fieldName, formattedDate);
break;
case DECIMAL_FIELD:
String numberDecimal =
fieldNode.get(DECIMAL_FIELD).asText();
((ObjectNode) logData).put(fieldName, numberDecimal);
break;
case LONG_FIELD:
long longFiled = fieldNode.get(LONG_FIELD).asLong();
((ObjectNode) logData).put(fieldName, longFiled);
break;
}
}
}
});
}