in storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java [225:283]
private void consumeMessage(MessageExt msg) {
if (msg.getKeys() == null) {
return;
}
synchronized (this) {
try {
log.info("receive msg, queue={}, offset={}, key={}, the content is {}", msg.getQueueId(),
msg.getQueueOffset(), msg.getKeys(), new String(msg.getBody()));
byte[] schemaFullName = converter.toBytes(msg.getKeys());
byte[] schemaInfoBytes = msg.getBody();
SchemaInfo update = converter.fromJson(schemaInfoBytes, SchemaInfo.class);
boolean isSchemaDeleted = Boolean.parseBoolean(msg.getUserProperty(DELETE_KEYS));
if (isSchemaDeleted) {
// delete
log.info("receive delete schema msg, schema = {}", update);
deleteAllSubject(update);
cache.delete(schemaCfHandle(), schemaFullName);
}
else {
byte[] lastRecordBytes = converter.toJsonAsBytes(update.getLastRecord());
byte[] result = cache.get(schemaCfHandle(), schemaFullName);
if (result == null) {
// register
cache.put(schemaCfHandle(), schemaFullName, schemaInfoBytes);
cache.put(subjectCfHandle(), converter.toBytes(update.subjectFullName()), lastRecordBytes);
} else {
SchemaInfo current = converter.fromJson(result, SchemaInfo.class);
boolean isVersionDeleted = current.getRecordCount() > update.getRecordCount();
if (current.getLastModifiedTime() != null && update.getLastModifiedTime() != null &&
current.getLastModifiedTime().after(update.getLastModifiedTime())) {
log.info("Current Schema is later version, no need to update.");
return;
}
if (current.getLastRecordVersion() == update.getLastRecordVersion() && !isVersionDeleted) {
log.info("Schema version is the same, no need to update.");
return;
}
if (current.getLastRecordVersion() > update.getLastRecordVersion() && !isVersionDeleted) {
throw new SchemaException("Schema version is invalid, update: "
+ update.getLastRecordVersion() + ", but current: " + current.getLastRecordVersion());
}
cache.put(schemaCfHandle(), schemaFullName, schemaInfoBytes);
update.getLastRecord().getSubjects().forEach(subject -> {
try {
cache.put(subjectCfHandle(), converter.toBytes(subject.fullName()), lastRecordBytes);
} catch (RocksDBException e) {
throw new SchemaException("Update schema: " + update.getQualifiedName() + " failed.", e);
}
});
}
}
} catch (Throwable e) {
log.error("Update schema cache failed, msg {}", new String(msg.getBody()), e);
throw new SchemaException("Update schema " + msg.getKeys() + " failed.", e);
}
}
}