in core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java [89:131]
public RegisterSchemaResponse register(QualifiedName qualifiedName, RegisterSchemaRequest registerSchemaDto) {
final RequestContext requestContext = RequestContextManager.getContext();
log.info("register get request context: " + requestContext);
// TODO: add user and ak sk
accessController.checkPermission("", qualifiedName.getTenant(), SchemaOperation.REGISTER);
checkSchemaExist(qualifiedName);
final AuditInfo audit = new AuditInfo();
audit.createBy(registerSchemaDto.getOwner(), registerSchemaDto.getDesc());
long schemaId = idGenerator.nextId();
final SchemaMetaInfo meta = new SchemaMetaInfo();
meta.setCompatibility(registerSchemaDto.getCompatibility());
meta.setOwner(registerSchemaDto.getOwner());
meta.setType(registerSchemaDto.getSchemaType());
meta.setSchemaName(qualifiedName.getSchema());
meta.setTenant(qualifiedName.getTenant());
meta.setUniqueId(schemaId);
final SchemaRecordInfo firstRecord = new SchemaRecordInfo();
firstRecord.setSchema(qualifiedName.schemaFullName());
firstRecord.setSchemaId(schemaId);
firstRecord.setType(registerSchemaDto.getSchemaType());
firstRecord.setIdl(registerSchemaDto.getSchemaIdl());
firstRecord.bindSubject(qualifiedName.subjectInfo());
final SchemaDetailInfo details = new SchemaDetailInfo(firstRecord);
final SchemaInfo schemaInfo = new SchemaInfo(qualifiedName, audit, meta, details);
if (config.isUploadEnabled()) {
// TODO: async upload to speed up register operation and keep atomic with register
Dependency dependency = dependencyService.compile(schemaInfo);
schemaInfo.setLastRecordDependency(dependency);
}
log.info("Creating schema info {}: {}", qualifiedName, schemaInfo);
storageServiceProxy.register(schemaInfo);
long schemaRecordId = CommonUtil.getSchemaRecordId(
schemaInfo.getUniqueId(), schemaInfo.getLastRecordVersion());
return new RegisterSchemaResponse(schemaRecordId, schemaInfo.getLastRecordVersion());
}