in stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java [373:487]
static void createMissingMetadata(final ZooKeeper zk,
final String basePath,
final String logRootPath,
final List<Versioned<byte[]>> metadatas,
final List<ACL> acl,
final boolean ownAllocator,
final boolean createIfNotExists,
final CompletableFuture<List<Versioned<byte[]>>> promise) {
final List<byte[]> pathsToCreate = Lists.newArrayListWithExpectedSize(metadatas.size());
final List<Op> zkOps = Lists.newArrayListWithExpectedSize(metadatas.size());
CreateMode createMode = CreateMode.PERSISTENT;
// log root parent path
String logRootParentPath = Utils.getParent(logRootPath);
if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT_PARENT))) {
pathsToCreate.add(null);
} else {
pathsToCreate.add(EMPTY_BYTES);
zkOps.add(Op.create(logRootParentPath, EMPTY_BYTES, acl, createMode));
}
// log root path
if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT))) {
pathsToCreate.add(null);
} else {
pathsToCreate.add(EMPTY_BYTES);
zkOps.add(Op.create(logRootPath, EMPTY_BYTES, acl, createMode));
}
// max id
if (pathExists(metadatas.get(MetadataIndex.MAX_TXID))) {
pathsToCreate.add(null);
} else {
byte[] zeroTxnIdData = DLUtils.serializeTransactionId(0L);
pathsToCreate.add(zeroTxnIdData);
zkOps.add(Op.create(logRootPath + MAX_TXID_PATH, zeroTxnIdData, acl, createMode));
}
// version
if (pathExists(metadatas.get(MetadataIndex.VERSION))) {
pathsToCreate.add(null);
} else {
byte[] versionData = intToBytes(LAYOUT_VERSION);
pathsToCreate.add(versionData);
zkOps.add(Op.create(logRootPath + VERSION_PATH, versionData, acl, createMode));
}
// lock path
if (pathExists(metadatas.get(MetadataIndex.LOCK))) {
pathsToCreate.add(null);
} else {
pathsToCreate.add(EMPTY_BYTES);
zkOps.add(Op.create(logRootPath + LOCK_PATH, EMPTY_BYTES, acl, createMode));
}
// read lock path
if (pathExists(metadatas.get(MetadataIndex.READ_LOCK))) {
pathsToCreate.add(null);
} else {
pathsToCreate.add(EMPTY_BYTES);
zkOps.add(Op.create(logRootPath + READ_LOCK_PATH, EMPTY_BYTES, acl, createMode));
}
// log segments path
if (pathExists(metadatas.get(MetadataIndex.LOGSEGMENTS))) {
pathsToCreate.add(null);
} else {
byte[] logSegmentsData = DLUtils.serializeLogSegmentSequenceNumber(
DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO);
pathsToCreate.add(logSegmentsData);
zkOps.add(Op.create(logRootPath + LOGSEGMENTS_PATH, logSegmentsData, acl, createMode));
}
// allocation path
if (ownAllocator) {
if (pathExists(metadatas.get(MetadataIndex.ALLOCATION))) {
pathsToCreate.add(null);
} else {
pathsToCreate.add(EMPTY_BYTES);
zkOps.add(Op.create(logRootPath + ALLOCATION_PATH,
EMPTY_BYTES, acl, createMode));
}
}
if (zkOps.isEmpty()) {
// nothing missed
promise.complete(metadatas);
return;
}
if (!createIfNotExists) {
promise.completeExceptionally(new LogNotFoundException("Log " + logRootPath + " not found"));
return;
}
getMissingPaths(zk, basePath, Utils.getParent(logRootParentPath))
.whenComplete(new FutureEventListener<List<String>>() {
@Override
public void onSuccess(List<String> paths) {
for (String path : paths) {
pathsToCreate.add(EMPTY_BYTES);
zkOps.add(
0, Op.create(path, EMPTY_BYTES, acl, createMode));
}
executeCreateMissingPathTxn(
zk,
zkOps,
pathsToCreate,
metadatas,
logRootPath,
promise
);
}
@Override
public void onFailure(Throwable cause) {
promise.completeExceptionally(cause);
return;
}
});
}