public void run()

in gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorageCli.java [60:157]


  public void run(String[] args) {
    Options options = new Options();
    options.addOption(HELP);
    options.addOption(ZK);
    options.addOption(JOB_NAME);
    options.addOption(ROOT_DIR);
    options.addOption(WATCH);

    CommandLine cli;
    try {
      CommandLineParser parser = new DefaultParser();
      cli = parser.parse(options, Arrays.copyOfRange(args, 1, args.length));
    } catch (ParseException pe) {
      System.out.println( "Command line parse exception: " + pe.getMessage() );
      return;
    }


    if (cli.hasOption(HELP.getOpt())) {
      printUsage(options);
      return;
    }



    TaskState taskState = new TaskState();

    String jobName;
    if (!cli.hasOption(JOB_NAME.getOpt())) {
      log.error("Need Job Name to be specified --", JOB_NAME.getLongOpt());
      throw new RuntimeException("Need Job Name to be specified");
    } else {
      jobName = cli.getOptionValue(JOB_NAME.getOpt());
      log.info("Using job name: {}", jobName);
    }
    taskState.setProp(ConfigurationKeys.JOB_NAME_KEY, jobName);


    String zkAddress = "locahost:2181";
    if (cli.hasOption(ZK.getOpt())) {
      zkAddress = cli.getOptionValue(ZK.getOpt());
    }

    log.info("Using zk address : {}", zkAddress);

    taskState.setProp(StateStoreBasedWatermarkStorage.WATERMARK_STORAGE_TYPE_KEY, "zk");
    taskState.setProp("state.store.zk.connectString", zkAddress);

    if (cli.hasOption(ROOT_DIR.getOpt())) {
      String rootDir = cli.getOptionValue(ROOT_DIR.getOpt());
      taskState.setProp(StateStoreBasedWatermarkStorage.WATERMARK_STORAGE_CONFIG_PREFIX
          + ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, rootDir);
      log.info("Setting root dir to {}", rootDir);
    } else {
      log.error("Need root directory specified");
      printUsage(options);
      return;
    }

    StateStoreBasedWatermarkStorage stateStoreBasedWatermarkStorage = new StateStoreBasedWatermarkStorage(taskState);

    final AtomicBoolean stop = new AtomicBoolean(true);

    if (cli.hasOption(WATCH.getOpt())) {
      stop.set(false);
    }
    try {


      if (!stop.get()) {
        Runtime.getRuntime().addShutdownHook(new Thread() {
          public void run() {
            stop.set(true);
          }
        });
      }
      do {
        boolean foundWatermark = false;
        try {
          for (CheckpointableWatermarkState wmState : stateStoreBasedWatermarkStorage.getAllCommittedWatermarks()) {
            foundWatermark = true;
            System.out.println(wmState.getProperties());
          }
        } catch (IOException ie) {
          Throwables.propagate(ie);
        }

        if (!foundWatermark) {
          System.out.println("No watermarks found.");
        }
        if (!stop.get()) {
          Thread.sleep(1000);
        }
      } while (!stop.get());
    } catch (Exception e) {
      Throwables.propagate(e);
    }
  }