public static FieldsMetadata extract()

in connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/metadata/FieldsMetadata.java [93:160]


    public static FieldsMetadata extract(
            final String tableName,
            final JdbcSinkConfig.PrimaryKeyMode pkMode,
            final List<String> configuredPkFields,
            final Set<String> fieldsWhitelist,
            final Schema keySchema,
            final Schema schema,
            final KeyValue headers
    ) {
        if (schema != null && schema.getFieldType() != FieldType.STRUCT) {
            throw new ConnectException("Value schema must be of type Struct");
        }
        final Map<String, SinkRecordField> allFields = new HashMap<>();
        final Set<String> keyFieldNames = new LinkedHashSet<>();
        switch (pkMode) {
            case NONE:
                break;
            case RECORD_KEY:
                extractRecordKeyPk(tableName, configuredPkFields, keySchema, allFields, keyFieldNames);
                break;
            case RECORD_VALUE:
                extractRecordValuePk(tableName, configuredPkFields, schema, headers, allFields, keyFieldNames);
                break;
            default:
                throw new ConnectException("Unknown primary key mode: " + pkMode);
        }
        final Set<String> nonKeyFieldNames = new LinkedHashSet<>();
        if (schema != null) {
            for (Field field : schema.getFields()) {
                if (keyFieldNames.contains(field.getName())) {
                    continue;
                }
                if (!fieldsWhitelist.isEmpty() && !fieldsWhitelist.contains(field.getName())) {
                    continue;
                }
                nonKeyFieldNames.add(field.getName());
                final Schema fieldSchema = field.getSchema();
                allFields.put(field.getName(), new SinkRecordField(fieldSchema, field.getName(), false));
            }
        }

        if (allFields.isEmpty()) {
            throw new ConnectException(
                    "No fields found using key and value schemas for table: " + tableName
            );
        }
        final Map<String, SinkRecordField> allFieldsOrdered = new LinkedHashMap<>();
        if (schema != null) {
            for (Field field : schema.getFields()) {
                String fieldName = field.getName();
                if (allFields.containsKey(fieldName)) {
                    allFieldsOrdered.put(fieldName, allFields.get(fieldName));
                }
            }
        }

        if (allFieldsOrdered.size() < allFields.size()) {
            ArrayList<String> fieldKeys = new ArrayList<>(allFields.keySet());
            Collections.sort(fieldKeys);
            for (String fieldName : fieldKeys) {
                if (!allFieldsOrdered.containsKey(fieldName)) {
                    allFieldsOrdered.put(fieldName, allFields.get(fieldName));
                }
            }
        }

        return new FieldsMetadata(keyFieldNames, nonKeyFieldNames, allFieldsOrdered);
    }