in oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java [136:230]
public void createTable(Model model) throws StorageException {
try {
DownSamplingConfigService configService = moduleManager.find(CoreModule.NAME)
.provider()
.getService(DownSamplingConfigService.class);
if (model.isTimeSeries()) {
if (model.isRecord()) { // stream
StreamModel streamModel = MetadataRegistry.INSTANCE.registerStreamModel(model, config, configService);
Stream stream = streamModel.getStream();
if (stream != null) {
log.info("install stream schema {}", model.getName());
final BanyanDBClient client = ((BanyanDBStorageClient) this.client).client;
try {
client.define(stream);
if (CollectionUtils.isNotEmpty(streamModel.getIndexRules())) {
for (IndexRule indexRule : streamModel.getIndexRules()) {
defineIndexRule(model.getName(), indexRule, client);
}
defineIndexRuleBinding(
streamModel.getIndexRules(), stream.getMetadata().getGroup(), stream.getMetadata().getName(),
BanyandbCommon.Catalog.CATALOG_STREAM, client
);
}
} catch (BanyanDBException ex) {
if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) {
log.info(
"Stream schema {}_{} already created by another OAP node",
model.getName(),
model.getDownsampling()
);
} else {
throw ex;
}
}
}
} else { // measure
MeasureModel measureModel = MetadataRegistry.INSTANCE.registerMeasureModel(model, config, configService);
Measure measure = measureModel.getMeasure();
if (measure != null) {
log.info("install measure schema {}", model.getName());
final BanyanDBClient client = ((BanyanDBStorageClient) this.client).client;
try {
client.define(measure);
if (CollectionUtils.isNotEmpty(measureModel.getIndexRules())) {
for (IndexRule indexRule : measureModel.getIndexRules()) {
defineIndexRule(model.getName(), indexRule, client);
}
defineIndexRuleBinding(
measureModel.getIndexRules(), measure.getMetadata().getGroup(), measure.getMetadata().getName(),
BanyandbCommon.Catalog.CATALOG_MEASURE, client
);
}
} catch (BanyanDBException ex) {
if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) {
log.info("Measure schema {}_{} already created by another OAP node",
model.getName(),
model.getDownsampling());
} else {
throw ex;
}
}
final MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model);
try {
defineTopNAggregation(schema, client);
} catch (BanyanDBException ex) {
if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) {
log.info("Measure schema {}_{} TopN({}) already created by another OAP node",
model.getName(),
model.getDownsampling(),
schema.getTopNSpec());
} else {
throw ex;
}
}
}
}
} else {
PropertyModel propertyModel = MetadataRegistry.INSTANCE.registerPropertyModel(model, config);
Property property = propertyModel.getProperty();
log.info("install property schema {}", model.getName());
final BanyanDBClient client = ((BanyanDBStorageClient) this.client).client;
try {
client.define(property);
} catch (BanyanDBException ex) {
if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) {
log.info("Property schema {} already created by another OAP node", model.getName());
} else {
throw ex;
}
}
}
} catch (BanyanDBException ex) {
throw new StorageException("fail to create schema " + model.getName(), ex);
}
}