private static void extractRecordKeyPk()

in connectors/rocketmq-connect-doris/src/main/java/org/apache/rocketmq/connect/doris/sink/metadata/FieldsMetadata.java [164:222]


    private static void extractRecordKeyPk(
            final String tableName,
            final List<String> configuredPkFields,
            final Schema keySchema,
            final Map<String, SinkRecordField> allFields,
            final Set<String> keyFieldNames
    ) {
        if (keySchema == null) {
            throw new ConnectException(String.format(
                    "PK mode for table '%s' is %s, but record key schema is missing",
                    tableName,
                    DorisSinkConfig.PrimaryKeyMode.RECORD_KEY
            ));
        }
        final FieldType keySchemaType = keySchema.getFieldType();
        switch (keySchemaType) {
            case STRUCT:
                if (configuredPkFields.isEmpty()) {
                    keySchema.getFields().forEach(keyField -> {
                        keyFieldNames.add(keyField.getName());
                    });
                } else {
                    for (String fieldName : configuredPkFields) {
                        final Field keyField = keySchema.getField(fieldName);
                        if (keyField == null) {
                            throw new ConnectException(String.format(
                                    "PK mode for table '%s' is %s with configured PK fields %s, but record key "
                                            + "schema does not contain field: %s",
                                    tableName, DorisSinkConfig.PrimaryKeyMode.RECORD_KEY, configuredPkFields, fieldName
                            ));
                        }
                    }
                    keyFieldNames.addAll(configuredPkFields);
                }
                for (String fieldName : keyFieldNames) {
                    final Schema fieldSchema = keySchema.getField(fieldName).getSchema();
                    allFields.put(fieldName, new SinkRecordField(fieldSchema, fieldName, true));
                }
                break;
            default:
                // todo
                if (keySchemaType.isPrimitive()) {
                    if (configuredPkFields.size() != 1) {
                        throw new ConnectException(String.format(
                                "Need exactly one PK column defined since the key schema for records is a "
                                        + "primitive type, defined columns are: %s",
                                configuredPkFields
                        ));
                    }
                    final String fieldName = configuredPkFields.get(0);
                    keyFieldNames.add(fieldName);
                    allFields.put(fieldName, new SinkRecordField(keySchema, fieldName, true));
                } else {
                    throw new ConnectException(
                            "Key schema must be primitive type or Struct, but is of type: " + keySchemaType
                    );
                }
        }
    }