in storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java [162:194]
private void startLocalCache() {
try {
CommonUtil.mkdir(cachePath);
List<byte[]> cfs = RocksDB.listColumnFamilies(options, cachePath);
if (cfs.size() <= 1) {
List<byte[]> columnFamilies = Arrays.asList(STORAGE_ROCKSDB_SCHEMA_COLUMN_FAMILY,
STORAGE_ROCKSDB_SUBJECT_COLUMN_FAMILY);
cache = org.rocksdb.RocksDB.open(options, cachePath);
cfDescriptors.addAll(columnFamilies.stream()
.map(ColumnFamilyDescriptor::new)
.collect(Collectors.toList()));
cfHandleList.addAll(cache.createColumnFamilies(cfDescriptors));
} else {
cfDescriptors.addAll(cfs.stream()
.map(ColumnFamilyDescriptor::new)
.collect(Collectors.toList()));
cache = org.rocksdb.RocksDB.open(dbOptions, cachePath, cfDescriptors, cfHandleList);
}
cfHandleMap.putAll(
cfHandleList.stream().collect(Collectors.toMap(h -> {
try {
return new String(h.getName());
} catch (RocksDBException e) {
throw new SchemaException("Failed to open RocksDB", e);
}
}, h -> h)));
assert cfHandleList.size() >= 2;
} catch (RocksDBException e) {
throw new SchemaException("Failed to open RocksDB", e);
}
}