in modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionManager.java [228:268]
public void run() {
try {
String me = myESNode.getActualPath();
while (me == null) {
Thread.sleep(100);
me = myESNode.getActualPath();
}
me = ZKPaths.getNodeFromPath(me);
String me2 = me;
boolean imFirst = childrenCache.getCurrentData().stream().map(ChildData::getPath)
.map(ZKPaths::getNodeFromPath).sorted().findFirst().map(s -> s.equals(me2))
.orElse(false);
if (imFirst) {
ChildData childData = childrenCache.getCurrentData(ZookeeperPath.FINDERS + "/splits");
if (childData == null) {
byte[] currSplitData = SerializedSplits.serializeTableSplits(env);
curator.create().forPath(ZookeeperPath.FINDERS + "/splits", currSplitData);
} else {
HashSet<Bytes> zkSplits = new HashSet<>();
SerializedSplits.deserialize(zkSplits::add, childData.getData());
HashSet<Bytes> currentSplits = new HashSet<>();
byte[] currSplitData = SerializedSplits.serializeTableSplits(env);
SerializedSplits.deserialize(currentSplits::add, currSplitData);
if (!currentSplits.equals(zkSplits)) {
curator.setData().forPath(ZookeeperPath.FINDERS + "/splits", currSplitData);
}
}
}
} catch (InterruptedException e) {
log.debug("Interrupted while checking table split points.", e);
} catch (Exception e) {
log.warn("Failed to checking table split points", e);
}
}