public void start()

in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/controller/coordinator/LeaderSelector.java [64:111]


  public void start() {
    if (curatorFramework.getState() == CuratorFrameworkState.LATENT) {
      curatorFramework.start();
      logger.info("leader.selector.client.started", StructuredArguments.keyValue(ID, nodeId));

      try {
        if (curatorFramework.checkExists().forPath(latchPath) == null) {
          curatorFramework.create().forPath(latchPath);
          logger.info(
              "leader.selector.path.created", StructuredArguments.keyValue(ZK_PATH, latchPath));
        }
      } catch (Exception e) {
        logger.error(
            "leaderselector failed to create latch path",
            StructuredArguments.keyValue(ZK_PATH, latchPath),
            e);
        throw new RuntimeException(e);
      }
    }
    if (leaderLatch.getState() == LeaderLatch.State.LATENT) {
      leaderLatch.addListener(
          new LeaderLatchListener() {
            @Override
            public void isLeader() {
              infra.scope().gauge(MetricsNames.IS_LEADER).update(1);
              logger.info(
                  "leader.selector.leader.acquired", StructuredArguments.keyValue(ID, nodeId));
            }

            @Override
            public void notLeader() {
              infra.scope().gauge(MetricsNames.IS_LEADER).update(0);
              logger.info(
                  "leader.selector.leader.notLeader", StructuredArguments.keyValue(ID, nodeId));
            }
          });
      try {
        leaderLatch.start();
        logger.info("leader.selector.latch.started", StructuredArguments.keyValue(ID, nodeId));
        infra.scope().gauge(MetricsNames.LATCH_STARTED).update(1);
      } catch (Exception e) {
        logger.error("trying to start leaderLatch failed", e);
        infra.scope().gauge(MetricsNames.LATCH_FAILED).update(1);
        throw new RuntimeException(e);
      }
    }
    running.set(true);
  }