in connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/metadata/FieldsMetadata.java [162:219]
private static void extractRecordKeyPk(
final String tableName,
final List<String> configuredPkFields,
final Schema keySchema,
final Map<String, SinkRecordField> allFields,
final Set<String> keyFieldNames
) {
if (keySchema == null) {
throw new ConnectException(String.format(
"PK mode for table '%s' is %s, but record key schema is missing",
tableName,
JdbcSinkConfig.PrimaryKeyMode.RECORD_KEY
));
}
final FieldType keySchemaType = keySchema.getFieldType();
switch (keySchemaType) {
case STRUCT:
if (configuredPkFields.isEmpty()) {
keySchema.getFields().forEach(keyField -> {
keyFieldNames.add(keyField.getName());
});
} else {
for (String fieldName : configuredPkFields) {
final Field keyField = keySchema.getField(fieldName);
if (keyField == null) {
throw new ConnectException(String.format(
"PK mode for table '%s' is %s with configured PK fields %s, but record key "
+ "schema does not contain field: %s",
tableName, JdbcSinkConfig.PrimaryKeyMode.RECORD_KEY, configuredPkFields, fieldName
));
}
}
keyFieldNames.addAll(configuredPkFields);
}
for (String fieldName : keyFieldNames) {
final Schema fieldSchema = keySchema.getField(fieldName).getSchema();
allFields.put(fieldName, new SinkRecordField(fieldSchema, fieldName, true));
}
break;
default:
if (keySchemaType.isPrimitive()) {
if (configuredPkFields.size() != 1) {
throw new ConnectException(String.format(
"Need exactly one PK column defined since the key schema for records is a "
+ "primitive type, defined columns are: %s",
configuredPkFields
));
}
final String fieldName = configuredPkFields.get(0);
keyFieldNames.add(fieldName);
allFields.put(fieldName, new SinkRecordField(keySchema, fieldName, true));
} else {
throw new ConnectException(
"Key schema must be primitive type or Struct, but is of type: " + keySchemaType
);
}
}
}