in modules/core/src/main/java/org/apache/fluo/core/oracle/OracleServer.java [311:359]
public synchronized void start() throws Exception {
if (started) {
throw new IllegalStateException();
}
curatorFramework = CuratorUtil.newAppCurator(env.getConfiguration());
curatorFramework.getConnectionStateListenable().addListener(cnxnListener);
curatorFramework.start();
while (!cnxnListener.isConnected()) {
UtilWaitThread.sleep(200);
}
final InetSocketAddress addr = startServer();
String leaderId = HostUtil.getHostName() + ":" + addr.getPort();
leaderLatch = new LeaderLatch(curatorFramework, ZookeeperPath.ORACLE_SERVER, leaderId);
log.info("Leader ID = " + leaderId);
execService = Executors.newSingleThreadExecutor(new FluoThreadFactory("Oracle Server Worker"));
leaderLatch.addListener(new LeaderLatchListener() {
@Override
public void notLeader() {
isLeader = false;
if (started) {
// if we stopped the server manually, we shouldn't halt
Halt.halt("Oracle has lost leadership unexpectedly and is now halting.");
}
}
@Override
public void isLeader() {
assumeLeadership();
}
}, execService);
leaderLatch.start();
pathChildrenCache = new PathChildrenCache(curatorFramework, oraclePath, true);
DeprecationUtil.addListener(pathChildrenCache.getListenable(), this);
pathChildrenCache.start();
while (!cnxnListener.isConnected()) {
UtilWaitThread.sleep(200);
}
log.info("Listening " + addr);
started = true;
}