protected int runCmd()

in stream/distributedlog/core/src/main/java/org/apache/distributedlog/admin/DistributedLogAdmin.java [527:652]


        protected int runCmd(CommandLine cmdline) throws Exception {
            boolean isQuery = cmdline.hasOption("q");
            if (!isQuery && (!cmdline.hasOption("l") || !cmdline.hasOption("s"))) {
                System.err.println("Error: Neither zkServers nor ledgersPath specified for bookkeeper environment.");
                printUsage();
                return -1;
            }
            String[] args = cmdline.getArgs();
            if (args.length <= 0) {
                System.err.println("No distributedlog uri specified.");
                printUsage();
                return -1;
            }
            boolean force = cmdline.hasOption("f");
            boolean creation = cmdline.hasOption("c");
            String bkLedgersPath = cmdline.getOptionValue("l");
            String bkZkServersForWriter = cmdline.getOptionValue("s");
            boolean sanityCheckTxnID =
                    !cmdline.hasOption("i") || Boolean.parseBoolean(cmdline.getOptionValue("i"));
            boolean encodeRegionID =
                    cmdline.hasOption("r") && Boolean.parseBoolean(cmdline.getOptionValue("r"));

            String bkZkServersForReader;
            if (cmdline.hasOption("bkzr")) {
                bkZkServersForReader = cmdline.getOptionValue("bkzr");
            } else {
                bkZkServersForReader = bkZkServersForWriter;
            }

            URI uri = URI.create(args[0]);

            String dlZkServersForWriter;
            String dlZkServersForReader;
            if (cmdline.hasOption("dlzw")) {
                dlZkServersForWriter = cmdline.getOptionValue("dlzw");
            } else {
                dlZkServersForWriter = BKNamespaceDriver.getZKServersFromDLUri(uri);
            }
            if (cmdline.hasOption("dlzr")) {
                dlZkServersForReader = cmdline.getOptionValue("dlzr");
            } else {
                dlZkServersForReader = dlZkServersForWriter;
            }

            // resolving the uri to see if there is another bindings in this uri.
            ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder().uri(uri).zkAclId(null)
                    .sessionTimeoutMs(10000).build();
            try {
                BKDLConfig newBKDLConfig =
                        new BKDLConfig(dlZkServersForWriter, dlZkServersForReader,
                                       bkZkServersForWriter, bkZkServersForReader, bkLedgersPath)
                                .setSanityCheckTxnID(sanityCheckTxnID)
                                .setEncodeRegionID(encodeRegionID);

                if (cmdline.hasOption("seqno")) {
                    newBKDLConfig = newBKDLConfig.setFirstLogSegmentSeqNo(
                            Long.parseLong(cmdline.getOptionValue("seqno")));
                }

                if (cmdline.hasOption("fns")) {
                    newBKDLConfig = newBKDLConfig.setFederatedNamespace(true);
                }

                BKDLConfig bkdlConfig;
                try {
                    bkdlConfig = BKDLConfig.resolveDLConfig(zkc, uri);
                } catch (IOException ie) {
                    bkdlConfig = null;
                }
                if (null == bkdlConfig) {
                    System.out.println("No bookkeeper is bound to " + uri);
                } else {
                    System.out.println("There is bookkeeper bound to " + uri + " : ");
                    System.out.println();
                    System.out.println(bkdlConfig);
                    System.out.println();
                    if (!isQuery) {
                        if (newBKDLConfig.equals(bkdlConfig)) {
                            System.out.println("No bookkeeper binding needs to be updated. Quit.");
                            return 0;
                        } else if (!newBKDLConfig.isFederatedNamespace() && bkdlConfig.isFederatedNamespace()) {
                            System.out.println("You can't turn a federated namespace back to non-federated.");
                            return 0;
                        } else {
                            if (!force && !IOUtils.confirmPrompt("Do you want to bind " + uri
                                        + " with new bookkeeper instance :\n" + newBKDLConfig)) {
                                return 0;
                            }
                        }
                    }
                }
                if (isQuery) {
                    System.out.println("Done.");
                    return 0;
                }
                DLMetadata dlMetadata = DLMetadata.create(newBKDLConfig);
                if (creation) {
                    try {
                        dlMetadata.create(uri);
                        System.out.println("Created binding on " + uri + ".");
                    } catch (IOException ie) {
                        System.err.println("Failed to create binding on " + uri + " : " + ie.getMessage());
                    }
                } else {
                    try {
                        dlMetadata.update(uri);
                        System.out.println("Updated binding on " + uri + " : ");
                        System.out.println();
                        System.out.println(newBKDLConfig.toString());
                        System.out.println();
                    } catch (IOException ie) {
                        System.err.println("Failed to update binding on " + uri + " : " + ie.getMessage());
                    }
                }
                if (newBKDLConfig.isFederatedNamespace()) {
                    try {
                        FederatedZKLogMetadataStore.createFederatedNamespace(uri, zkc);
                    } catch (KeeperException.NodeExistsException nee) {
                        // ignore node exists exception
                    }
                }
                return 0;
            } finally {
                zkc.close();
            }
        }