in storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java [337:368]
public void deleteByVersion(QualifiedName name) {
SchemaInfo schemaInfo = getSchemaInfoBySubject(name.subjectFullName());
if (schemaInfo == null || schemaInfo.getDetails() == null || schemaInfo.getDetails().getSchemaRecords() == null) {
throw new SchemaNotFoundException(name);
}
List<SubjectInfo> subjects = schemaInfo.getLastRecord().getSubjects();
List<SchemaRecordInfo> schemaRecords = schemaInfo.getDetails().getSchemaRecords();
schemaRecords.removeIf(record -> record.getVersion() == name.getVersion());
if (CollectionUtils.isEmpty(schemaRecords)) {
deleteBySubject(name);
}
// delete but still need bind subject
if (schemaInfo.getLastRecord().getSubjects().isEmpty()) {
schemaInfo.getLastRecord().setSubjects(subjects);
}
byte[] schemaInfoBytes = converter.toJsonAsBytes(schemaInfo);
try {
synchronized (this) {
Message msg = new Message(storageTopic, "", schemaInfo.schemaFullName(), schemaInfoBytes);
SendResult result = sendOrderMessageToRocketmq(msg);
if (result.getSendStatus() != SendStatus.SEND_OK) {
throw new SchemaException("Update " + name + " failed: " + result.getSendStatus());
}
}
} catch (SchemaException e) {
throw e;
} catch (Exception e) {
throw new SchemaException("Update schema " + name + " failed", e);
}
}