in stream/distributedlog/core/src/main/java/org/apache/distributedlog/admin/DistributedLogAdmin.java [527:652]
protected int runCmd(CommandLine cmdline) throws Exception {
boolean isQuery = cmdline.hasOption("q");
if (!isQuery && (!cmdline.hasOption("l") || !cmdline.hasOption("s"))) {
System.err.println("Error: Neither zkServers nor ledgersPath specified for bookkeeper environment.");
printUsage();
return -1;
}
String[] args = cmdline.getArgs();
if (args.length <= 0) {
System.err.println("No distributedlog uri specified.");
printUsage();
return -1;
}
boolean force = cmdline.hasOption("f");
boolean creation = cmdline.hasOption("c");
String bkLedgersPath = cmdline.getOptionValue("l");
String bkZkServersForWriter = cmdline.getOptionValue("s");
boolean sanityCheckTxnID =
!cmdline.hasOption("i") || Boolean.parseBoolean(cmdline.getOptionValue("i"));
boolean encodeRegionID =
cmdline.hasOption("r") && Boolean.parseBoolean(cmdline.getOptionValue("r"));
String bkZkServersForReader;
if (cmdline.hasOption("bkzr")) {
bkZkServersForReader = cmdline.getOptionValue("bkzr");
} else {
bkZkServersForReader = bkZkServersForWriter;
}
URI uri = URI.create(args[0]);
String dlZkServersForWriter;
String dlZkServersForReader;
if (cmdline.hasOption("dlzw")) {
dlZkServersForWriter = cmdline.getOptionValue("dlzw");
} else {
dlZkServersForWriter = BKNamespaceDriver.getZKServersFromDLUri(uri);
}
if (cmdline.hasOption("dlzr")) {
dlZkServersForReader = cmdline.getOptionValue("dlzr");
} else {
dlZkServersForReader = dlZkServersForWriter;
}
// resolving the uri to see if there is another bindings in this uri.
ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder().uri(uri).zkAclId(null)
.sessionTimeoutMs(10000).build();
try {
BKDLConfig newBKDLConfig =
new BKDLConfig(dlZkServersForWriter, dlZkServersForReader,
bkZkServersForWriter, bkZkServersForReader, bkLedgersPath)
.setSanityCheckTxnID(sanityCheckTxnID)
.setEncodeRegionID(encodeRegionID);
if (cmdline.hasOption("seqno")) {
newBKDLConfig = newBKDLConfig.setFirstLogSegmentSeqNo(
Long.parseLong(cmdline.getOptionValue("seqno")));
}
if (cmdline.hasOption("fns")) {
newBKDLConfig = newBKDLConfig.setFederatedNamespace(true);
}
BKDLConfig bkdlConfig;
try {
bkdlConfig = BKDLConfig.resolveDLConfig(zkc, uri);
} catch (IOException ie) {
bkdlConfig = null;
}
if (null == bkdlConfig) {
System.out.println("No bookkeeper is bound to " + uri);
} else {
System.out.println("There is bookkeeper bound to " + uri + " : ");
System.out.println();
System.out.println(bkdlConfig);
System.out.println();
if (!isQuery) {
if (newBKDLConfig.equals(bkdlConfig)) {
System.out.println("No bookkeeper binding needs to be updated. Quit.");
return 0;
} else if (!newBKDLConfig.isFederatedNamespace() && bkdlConfig.isFederatedNamespace()) {
System.out.println("You can't turn a federated namespace back to non-federated.");
return 0;
} else {
if (!force && !IOUtils.confirmPrompt("Do you want to bind " + uri
+ " with new bookkeeper instance :\n" + newBKDLConfig)) {
return 0;
}
}
}
}
if (isQuery) {
System.out.println("Done.");
return 0;
}
DLMetadata dlMetadata = DLMetadata.create(newBKDLConfig);
if (creation) {
try {
dlMetadata.create(uri);
System.out.println("Created binding on " + uri + ".");
} catch (IOException ie) {
System.err.println("Failed to create binding on " + uri + " : " + ie.getMessage());
}
} else {
try {
dlMetadata.update(uri);
System.out.println("Updated binding on " + uri + " : ");
System.out.println();
System.out.println(newBKDLConfig.toString());
System.out.println();
} catch (IOException ie) {
System.err.println("Failed to update binding on " + uri + " : " + ie.getMessage());
}
}
if (newBKDLConfig.isFederatedNamespace()) {
try {
FederatedZKLogMetadataStore.createFederatedNamespace(uri, zkc);
} catch (KeeperException.NodeExistsException nee) {
// ignore node exists exception
}
}
return 0;
} finally {
zkc.close();
}
}