in src/main/java/com/aliyun/dts/subscribe/clients/record/AvroRecordParser.java [138:174]
public static RecordSchema getRecordSchema(Record avroRecord) {
// parse db/schema/tb name
Triple<String, String, String> names = getNames(avroRecord.getObjectName());
// parse pk/uk names
Pair<Set<String>, List<Set<String>>> keyNamePair = getPrimaryAndUniqueKeyNames(avroRecord);
Set<String> pkNames = keyNamePair.getLeft();
Set<String> allUkNames = keyNamePair.getRight().stream()
.flatMap(Set::stream)
.collect(Collectors.toSet());
// parse record fields
Pair<List<RecordField>, RecordIndexInfo> recordFieldAndIndexInfo = getRecordFields(avroRecord, pkNames, allUkNames);
// compose record schema
String schemaId = names.toString();
List<RecordField> recordFields = recordFieldAndIndexInfo.getLeft();
RecordIndexInfo pkOrUkIndexInfo = recordFieldAndIndexInfo.getRight();
DefaultRecordSchema recordSchema = new DefaultRecordSchema(schemaId, names.getLeft(), names.getRight(), recordFields);
//db type and version
recordSchema.setDatabaseInfo(new DatabaseInfo(avroRecord.getSource().getSourceType().name(), avroRecord.getSource().getVersion()));
if (!pkOrUkIndexInfo.getIndexFields().isEmpty()) {
recordSchema.setPrimaryIndexInfo(pkOrUkIndexInfo);
}
// build uk index infos
for (Set<String> ukNameSet : keyNamePair.getRight()) {
RecordIndexInfo recordIndexInfo = new RecordIndexInfo(RecordIndexInfo.IndexType.UniqueKey);
for (String ukFieldName : ukNameSet) {
recordIndexInfo.addField(recordSchema.getField(ukFieldName).orElseThrow(() -> new RuntimeException(ukFieldName + " not found in record [" + avroRecord + "]")));
}
recordSchema.addUniqueIndexInfo(recordIndexInfo);
}
return recordSchema;
}