private void updatePartitionInfo()

in modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionManager.java [154:214]


  private void updatePartitionInfo() {
    try {
      String me = myESNode.getActualPath();
      while (me == null) {
        Thread.sleep(100);
        me = myESNode.getActualPath();
      }
      me = ZKPaths.getNodeFromPath(me);

      byte[] zkSplitData = null;
      SortedSet<String> children = new TreeSet<>();
      Set<String> groupSizes = new HashSet<>();
      for (ChildData childData : childrenCache.getCurrentData()) {
        String node = ZKPaths.getNodeFromPath(childData.getPath());
        if (node.equals("splits")) {
          zkSplitData = childData.getData();
        } else {
          children.add(node);
          groupSizes.add(new String(childData.getData(), UTF_8));
        }
      }

      if (zkSplitData == null) {
        log.info("Did not find splits in zookeeper, will retry later.");
        setPartitionInfo(null); // disable this worker from processing notifications
        scheduleRetry();
        return;
      }

      if (!children.contains(me)) {
        log.warn("Did not see self (" + me
            + "), cannot gather tablet and notification partitioning info.");
        setPartitionInfo(null); // disable this worker from processing notifications
        scheduleRetry();
        return;
      }

      // ensure all workers agree on the group size
      if (groupSizes.size() != 1 || !groupSizes.contains(groupSize + "")) {
        log.warn("Group size disagreement " + groupSize + " " + groupSizes
            + ", cannot gather tablet and notification partitioning info.");
        setPartitionInfo(null); // disable this worker from processing notifications
        scheduleRetry();
        return;
      }

      List<Bytes> zkSplits = new ArrayList<>();
      SerializedSplits.deserialize(zkSplits::add, zkSplitData);

      Collection<TableRange> tableRanges = TableRange.fromBytes(zkSplits);
      PartitionInfo newPI = getGroupInfo(me, children, tableRanges, groupSize);

      setPartitionInfo(newPI);
    } catch (InterruptedException e) {
      log.debug("Interrupted while gathering tablet and notification partitioning info.", e);
    } catch (Exception e) {
      log.warn("Problem gathering tablet and notification partitioning info.", e);
      setPartitionInfo(null); // disable this worker from processing notifications
      scheduleRetry();
    }
  }