in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/controller/coordinator/LeaderSelector.java [64:111]
public void start() {
if (curatorFramework.getState() == CuratorFrameworkState.LATENT) {
curatorFramework.start();
logger.info("leader.selector.client.started", StructuredArguments.keyValue(ID, nodeId));
try {
if (curatorFramework.checkExists().forPath(latchPath) == null) {
curatorFramework.create().forPath(latchPath);
logger.info(
"leader.selector.path.created", StructuredArguments.keyValue(ZK_PATH, latchPath));
}
} catch (Exception e) {
logger.error(
"leaderselector failed to create latch path",
StructuredArguments.keyValue(ZK_PATH, latchPath),
e);
throw new RuntimeException(e);
}
}
if (leaderLatch.getState() == LeaderLatch.State.LATENT) {
leaderLatch.addListener(
new LeaderLatchListener() {
@Override
public void isLeader() {
infra.scope().gauge(MetricsNames.IS_LEADER).update(1);
logger.info(
"leader.selector.leader.acquired", StructuredArguments.keyValue(ID, nodeId));
}
@Override
public void notLeader() {
infra.scope().gauge(MetricsNames.IS_LEADER).update(0);
logger.info(
"leader.selector.leader.notLeader", StructuredArguments.keyValue(ID, nodeId));
}
});
try {
leaderLatch.start();
logger.info("leader.selector.latch.started", StructuredArguments.keyValue(ID, nodeId));
infra.scope().gauge(MetricsNames.LATCH_STARTED).update(1);
} catch (Exception e) {
logger.error("trying to start leaderLatch failed", e);
infra.scope().gauge(MetricsNames.LATCH_FAILED).update(1);
throw new RuntimeException(e);
}
}
running.set(true);
}