in storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java [287:311]
public SchemaInfo registerSchema(SchemaInfo schema) {
byte[] schemaFullName = converter.toBytes(schema.schemaFullName());
byte[] schemaInfo = converter.toJsonAsBytes(schema);
try {
synchronized (this) {
if (cache.get(schemaCfHandle(), schemaFullName) != null) {
throw new SchemaExistException(schema.getQualifiedName());
}
Message message = new Message(storageTopic, "", schema.schemaFullName(), schemaInfo);
SendResult result = sendOrderMessageToRocketmq(message);
if (!result.getSendStatus().equals(SendStatus.SEND_OK)) {
throw new SchemaException("Register schema: " + schema.getQualifiedName() + " failed: " + result.getSendStatus());
}
log.info("send message success, msgId = {}", result.getMsgId());
}
return schema;
} catch (SchemaException e) {
throw e;
} catch (Exception e) {
throw new SchemaException("register schema failed", e);
}
}