static void createMissingMetadata()

in stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java [373:487]


    static void createMissingMetadata(final ZooKeeper zk,
                                      final String basePath,
                                      final String logRootPath,
                                      final List<Versioned<byte[]>> metadatas,
                                      final List<ACL> acl,
                                      final boolean ownAllocator,
                                      final boolean createIfNotExists,
                                      final CompletableFuture<List<Versioned<byte[]>>> promise) {
        final List<byte[]> pathsToCreate = Lists.newArrayListWithExpectedSize(metadatas.size());
        final List<Op> zkOps = Lists.newArrayListWithExpectedSize(metadatas.size());
        CreateMode createMode = CreateMode.PERSISTENT;

        // log root parent path
        String logRootParentPath = Utils.getParent(logRootPath);
        if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT_PARENT))) {
            pathsToCreate.add(null);
        } else {
            pathsToCreate.add(EMPTY_BYTES);
            zkOps.add(Op.create(logRootParentPath, EMPTY_BYTES, acl, createMode));
        }

        // log root path
        if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT))) {
            pathsToCreate.add(null);
        } else {
            pathsToCreate.add(EMPTY_BYTES);
            zkOps.add(Op.create(logRootPath, EMPTY_BYTES, acl, createMode));
        }

        // max id
        if (pathExists(metadatas.get(MetadataIndex.MAX_TXID))) {
            pathsToCreate.add(null);
        } else {
            byte[] zeroTxnIdData = DLUtils.serializeTransactionId(0L);
            pathsToCreate.add(zeroTxnIdData);
            zkOps.add(Op.create(logRootPath + MAX_TXID_PATH, zeroTxnIdData, acl, createMode));
        }
        // version
        if (pathExists(metadatas.get(MetadataIndex.VERSION))) {
            pathsToCreate.add(null);
        } else {
            byte[] versionData = intToBytes(LAYOUT_VERSION);
            pathsToCreate.add(versionData);
            zkOps.add(Op.create(logRootPath + VERSION_PATH, versionData, acl, createMode));
        }
        // lock path
        if (pathExists(metadatas.get(MetadataIndex.LOCK))) {
            pathsToCreate.add(null);
        } else {
            pathsToCreate.add(EMPTY_BYTES);
            zkOps.add(Op.create(logRootPath + LOCK_PATH, EMPTY_BYTES, acl, createMode));
        }
        // read lock path
        if (pathExists(metadatas.get(MetadataIndex.READ_LOCK))) {
            pathsToCreate.add(null);
        } else {
            pathsToCreate.add(EMPTY_BYTES);
            zkOps.add(Op.create(logRootPath + READ_LOCK_PATH, EMPTY_BYTES, acl, createMode));
        }
        // log segments path
        if (pathExists(metadatas.get(MetadataIndex.LOGSEGMENTS))) {
            pathsToCreate.add(null);
        } else {
            byte[] logSegmentsData = DLUtils.serializeLogSegmentSequenceNumber(
                DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO);
            pathsToCreate.add(logSegmentsData);
            zkOps.add(Op.create(logRootPath + LOGSEGMENTS_PATH, logSegmentsData, acl, createMode));
        }
        // allocation path
        if (ownAllocator) {
            if (pathExists(metadatas.get(MetadataIndex.ALLOCATION))) {
                pathsToCreate.add(null);
            } else {
                pathsToCreate.add(EMPTY_BYTES);
                zkOps.add(Op.create(logRootPath + ALLOCATION_PATH,
                    EMPTY_BYTES, acl, createMode));
            }
        }
        if (zkOps.isEmpty()) {
            // nothing missed
            promise.complete(metadatas);
            return;
        }
        if (!createIfNotExists) {
            promise.completeExceptionally(new LogNotFoundException("Log " + logRootPath + " not found"));
            return;
        }

        getMissingPaths(zk, basePath, Utils.getParent(logRootParentPath))
            .whenComplete(new FutureEventListener<List<String>>() {
                @Override
                public void onSuccess(List<String> paths) {
                    for (String path : paths) {
                        pathsToCreate.add(EMPTY_BYTES);
                        zkOps.add(
                            0, Op.create(path, EMPTY_BYTES, acl, createMode));
                    }
                    executeCreateMissingPathTxn(
                        zk,
                        zkOps,
                        pathsToCreate,
                        metadatas,
                        logRootPath,
                        promise
                    );
                }

                @Override
                public void onFailure(Throwable cause) {
                    promise.completeExceptionally(cause);
                    return;
                }
            });

    }