in connectors/rocketmq-connect-doris/src/main/java/org/apache/rocketmq/connect/doris/sink/metadata/FieldsMetadata.java [93:162]
public static FieldsMetadata extract(
final String tableName,
final DorisSinkConfig.PrimaryKeyMode pkMode,
final List<String> configuredPkFields,
final Set<String> fieldsWhitelist,
final Schema keySchema,
final Schema schema,
final KeyValue headers
) {
if (schema != null && schema.getFieldType() != FieldType.STRUCT) {
throw new ConnectException("Value schema must be of type Struct");
}
final Map<String, SinkRecordField> allFields = new HashMap<>();
final Set<String> keyFieldNames = new LinkedHashSet<>();
switch (pkMode) {
case NONE:
break;
case RECORD_KEY:
extractRecordKeyPk(tableName, configuredPkFields, keySchema, allFields, keyFieldNames);
break;
case RECORD_VALUE:
extractRecordValuePk(tableName, configuredPkFields, schema, headers, allFields, keyFieldNames);
break;
default:
throw new ConnectException("Unknown primary key mode: " + pkMode);
}
final Set<String> nonKeyFieldNames = new LinkedHashSet<>();
if (schema != null) {
for (Field field : schema.getFields()) {
if (keyFieldNames.contains(field.getName())) {
continue;
}
if (!fieldsWhitelist.isEmpty() && !fieldsWhitelist.contains(field.getName())) {
continue;
}
nonKeyFieldNames.add(field.getName());
final Schema fieldSchema = field.getSchema();
allFields.put(field.getName(), new SinkRecordField(fieldSchema, field.getName(), false));
}
}
if (allFields.isEmpty()) {
throw new ConnectException(
"No fields found using key and value schemas for table: " + tableName
);
}
final Map<String, SinkRecordField> allFieldsOrdered = new LinkedHashMap<>();
if (schema != null) {
for (Field field : schema.getFields()) {
String fieldName = field.getName();
if (allFields.containsKey(fieldName)) {
allFieldsOrdered.put(fieldName, allFields.get(fieldName));
}
}
}
if (allFieldsOrdered.size() < allFields.size()) {
ArrayList<String> fieldKeys = new ArrayList<>(allFields.keySet());
Collections.sort(fieldKeys);
for (String fieldName : fieldKeys) {
if (!allFieldsOrdered.containsKey(fieldName)) {
allFieldsOrdered.put(fieldName, allFields.get(fieldName));
}
}
}
return new FieldsMetadata(keyFieldNames, nonKeyFieldNames, allFieldsOrdered);
}