public static RecordSchema getRecordSchema()

in src/main/java/com/aliyun/dts/subscribe/clients/record/AvroRecordParser.java [138:174]


    public static RecordSchema getRecordSchema(Record avroRecord) {
        // parse db/schema/tb name
        Triple<String, String, String> names = getNames(avroRecord.getObjectName());

        // parse pk/uk names
        Pair<Set<String>, List<Set<String>>> keyNamePair = getPrimaryAndUniqueKeyNames(avroRecord);
        Set<String> pkNames = keyNamePair.getLeft();
        Set<String> allUkNames = keyNamePair.getRight().stream()
                .flatMap(Set::stream)
                .collect(Collectors.toSet());

        // parse record fields
        Pair<List<RecordField>, RecordIndexInfo> recordFieldAndIndexInfo = getRecordFields(avroRecord, pkNames, allUkNames);

        // compose record schema
        String schemaId = names.toString();
        List<RecordField> recordFields = recordFieldAndIndexInfo.getLeft();
        RecordIndexInfo pkOrUkIndexInfo = recordFieldAndIndexInfo.getRight();
        DefaultRecordSchema recordSchema = new DefaultRecordSchema(schemaId, names.getLeft(), names.getRight(), recordFields);

        //db type and version
        recordSchema.setDatabaseInfo(new DatabaseInfo(avroRecord.getSource().getSourceType().name(), avroRecord.getSource().getVersion()));

        if (!pkOrUkIndexInfo.getIndexFields().isEmpty()) {
            recordSchema.setPrimaryIndexInfo(pkOrUkIndexInfo);
        }
        // build uk index infos
        for (Set<String> ukNameSet : keyNamePair.getRight()) {
            RecordIndexInfo recordIndexInfo = new RecordIndexInfo(RecordIndexInfo.IndexType.UniqueKey);
            for (String ukFieldName : ukNameSet) {
                recordIndexInfo.addField(recordSchema.getField(ukFieldName).orElseThrow(() -> new RuntimeException(ukFieldName + " not found in record [" + avroRecord + "]")));
            }
            recordSchema.addUniqueIndexInfo(recordIndexInfo);
        }

        return recordSchema;
    }