in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java [419:444]
public void fillOriginSchema(JsonNode columns) {
if (Objects.nonNull(originFieldSchemaMap)) {
for (JsonNode column : columns) {
String fieldName = column.get("name").asText();
if (originFieldSchemaMap.containsKey(fieldName)) {
JsonNode length = column.get("length");
JsonNode scale = column.get("scale");
String type = MysqlType.toDorisType(column.get("typeName").asText(),
length == null ? 0 : length.asInt(),
scale == null ? 0 : scale.asInt());
String defaultValue = handleDefaultValue(extractJsonNode(column, "defaultValueExpression"));
String comment = extractJsonNode(column, "comment");
FieldSchema fieldSchema = originFieldSchemaMap.get(fieldName);
fieldSchema.setName(fieldName);
fieldSchema.setTypeString(type);
fieldSchema.setComment(comment);
fieldSchema.setDefaultValue(defaultValue);
}
}
} else {
originFieldSchemaMap = new LinkedHashMap<>();
columns.forEach(column -> buildFieldSchema(originFieldSchemaMap, column));
}
firstSchemaChange = false;
firstLoad = false;
}