in gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorageCli.java [60:157]
public void run(String[] args) {
Options options = new Options();
options.addOption(HELP);
options.addOption(ZK);
options.addOption(JOB_NAME);
options.addOption(ROOT_DIR);
options.addOption(WATCH);
CommandLine cli;
try {
CommandLineParser parser = new DefaultParser();
cli = parser.parse(options, Arrays.copyOfRange(args, 1, args.length));
} catch (ParseException pe) {
System.out.println( "Command line parse exception: " + pe.getMessage() );
return;
}
if (cli.hasOption(HELP.getOpt())) {
printUsage(options);
return;
}
TaskState taskState = new TaskState();
String jobName;
if (!cli.hasOption(JOB_NAME.getOpt())) {
log.error("Need Job Name to be specified --", JOB_NAME.getLongOpt());
throw new RuntimeException("Need Job Name to be specified");
} else {
jobName = cli.getOptionValue(JOB_NAME.getOpt());
log.info("Using job name: {}", jobName);
}
taskState.setProp(ConfigurationKeys.JOB_NAME_KEY, jobName);
String zkAddress = "locahost:2181";
if (cli.hasOption(ZK.getOpt())) {
zkAddress = cli.getOptionValue(ZK.getOpt());
}
log.info("Using zk address : {}", zkAddress);
taskState.setProp(StateStoreBasedWatermarkStorage.WATERMARK_STORAGE_TYPE_KEY, "zk");
taskState.setProp("state.store.zk.connectString", zkAddress);
if (cli.hasOption(ROOT_DIR.getOpt())) {
String rootDir = cli.getOptionValue(ROOT_DIR.getOpt());
taskState.setProp(StateStoreBasedWatermarkStorage.WATERMARK_STORAGE_CONFIG_PREFIX
+ ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, rootDir);
log.info("Setting root dir to {}", rootDir);
} else {
log.error("Need root directory specified");
printUsage(options);
return;
}
StateStoreBasedWatermarkStorage stateStoreBasedWatermarkStorage = new StateStoreBasedWatermarkStorage(taskState);
final AtomicBoolean stop = new AtomicBoolean(true);
if (cli.hasOption(WATCH.getOpt())) {
stop.set(false);
}
try {
if (!stop.get()) {
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
stop.set(true);
}
});
}
do {
boolean foundWatermark = false;
try {
for (CheckpointableWatermarkState wmState : stateStoreBasedWatermarkStorage.getAllCommittedWatermarks()) {
foundWatermark = true;
System.out.println(wmState.getProperties());
}
} catch (IOException ie) {
Throwables.propagate(ie);
}
if (!foundWatermark) {
System.out.println("No watermarks found.");
}
if (!stop.get()) {
Thread.sleep(1000);
}
} while (!stop.get());
} catch (Exception e) {
Throwables.propagate(e);
}
}