private void consumeMessage()

in storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java [225:283]


        private void consumeMessage(MessageExt msg) {
            if (msg.getKeys() == null) {
                return;
            }
            synchronized (this) {
                try {
                    log.info("receive msg, queue={}, offset={}, key={}, the content is {}", msg.getQueueId(),
                        msg.getQueueOffset(), msg.getKeys(), new String(msg.getBody()));
                    byte[] schemaFullName = converter.toBytes(msg.getKeys());
                    byte[] schemaInfoBytes = msg.getBody();
                    SchemaInfo update = converter.fromJson(schemaInfoBytes, SchemaInfo.class);
                    boolean isSchemaDeleted = Boolean.parseBoolean(msg.getUserProperty(DELETE_KEYS));
                    if (isSchemaDeleted) {
                        // delete
                        log.info("receive delete schema msg, schema = {}", update);
                        deleteAllSubject(update);
                        cache.delete(schemaCfHandle(), schemaFullName);
                    }
                    else {
                        byte[] lastRecordBytes = converter.toJsonAsBytes(update.getLastRecord());

                        byte[] result = cache.get(schemaCfHandle(), schemaFullName);
                        if (result == null) {
                            // register
                            cache.put(schemaCfHandle(), schemaFullName, schemaInfoBytes);
                            cache.put(subjectCfHandle(), converter.toBytes(update.subjectFullName()), lastRecordBytes);
                        } else {
                            SchemaInfo current = converter.fromJson(result, SchemaInfo.class);
                            boolean isVersionDeleted = current.getRecordCount() > update.getRecordCount();
                            if (current.getLastModifiedTime() != null && update.getLastModifiedTime() != null &&
                                current.getLastModifiedTime().after(update.getLastModifiedTime())) {
                                log.info("Current Schema is later version, no need to update.");
                                return;
                            }
                            if (current.getLastRecordVersion() == update.getLastRecordVersion() && !isVersionDeleted) {
                                log.info("Schema version is the same, no need to update.");
                                return;
                            }
                            if (current.getLastRecordVersion() > update.getLastRecordVersion() && !isVersionDeleted) {
                                throw new SchemaException("Schema version is invalid, update: "
                                    + update.getLastRecordVersion() + ", but current: " + current.getLastRecordVersion());
                            }

                            cache.put(schemaCfHandle(), schemaFullName, schemaInfoBytes);
                            update.getLastRecord().getSubjects().forEach(subject -> {
                                try {
                                    cache.put(subjectCfHandle(), converter.toBytes(subject.fullName()), lastRecordBytes);
                                } catch (RocksDBException e) {
                                    throw new SchemaException("Update schema: " + update.getQualifiedName() + " failed.", e);
                                }
                            });
                        }
                    }
                } catch (Throwable e) {
                    log.error("Update schema cache failed, msg {}", new String(msg.getBody()), e);
                    throw new SchemaException("Update schema " + msg.getKeys() + " failed.", e);
                }
            }
        }