public void announce()

in server/src/main/java/org/apache/druid/curator/announcement/Announcer.java [192:330]


  public void announce(String path, byte[] bytes, boolean removeParentIfCreated)
  {
    synchronized (toAnnounce) {
      if (!started) {
        toAnnounce.add(new Announceable(path, bytes, removeParentIfCreated));
        return;
      }
    }

    final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);

    final String parentPath = pathAndNode.getPath();
    boolean buildParentPath = false;

    ConcurrentMap<String, byte[]> subPaths = announcements.get(parentPath);

    if (subPaths == null) {
      try {
        if (curator.checkExists().forPath(parentPath) == null) {
          buildParentPath = true;
        }
      }
      catch (Exception e) {
        log.debug(e, "Problem checking if the parent existed, ignoring.");
      }

      // I don't have a watcher on this path yet, create a Map and start watching.
      announcements.putIfAbsent(parentPath, new ConcurrentHashMap<>());

      // Guaranteed to be non-null, but might be a map put in there by another thread.
      final ConcurrentMap<String, byte[]> finalSubPaths = announcements.get(parentPath);

      // Synchronize to make sure that I only create a listener once.
      synchronized (finalSubPaths) {
        if (!listeners.containsKey(parentPath)) {
          final PathChildrenCache cache = factory.make(curator, parentPath);
          cache.getListenable().addListener(
              new PathChildrenCacheListener()
              {
                private final AtomicReference<Set<String>> pathsLost = new AtomicReference<>(null);

                @Override
                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
                {
                  // NOTE: ZooKeeper does not guarantee that we will get every event, and thus PathChildrenCache doesn't
                  // as well. If one of the below events are missed, Announcer might not work properly.
                  log.debug("Path[%s] got event[%s]", parentPath, event);
                  switch (event.getType()) {
                    case CHILD_REMOVED:
                      final ChildData child = event.getData();
                      final ZKPaths.PathAndNode childPath = ZKPaths.getPathAndNode(child.getPath());
                      final byte[] value = finalSubPaths.get(childPath.getNode());
                      if (value != null) {
                        log.info("Node[%s] dropped, reinstating.", child.getPath());
                        createAnnouncement(child.getPath(), value);
                      }
                      break;
                    case CONNECTION_LOST:
                      // Lost connection, which means session is broken, take inventory of what has been seen.
                      // This is to protect from a race condition in which the ephemeral node could have been
                      // created but not actually seen by the PathChildrenCache, which means that it won't know
                      // that it disappeared and thus will not generate a CHILD_REMOVED event for us.  Under normal
                      // circumstances, this can only happen upon connection loss; but technically if you have
                      // an adversary in the system, they could also delete the ephemeral node before the cache sees
                      // it.  This does not protect from that case, so don't have adversaries.

                      Set<String> pathsToReinstate = new HashSet<>();
                      for (String node : finalSubPaths.keySet()) {
                        String path = ZKPaths.makePath(parentPath, node);
                        log.info("Node[%s] is added to reinstate.", path);
                        pathsToReinstate.add(path);
                      }

                      if (!pathsToReinstate.isEmpty() && !pathsLost.compareAndSet(null, pathsToReinstate)) {
                        log.info("Already had a pathsLost set!?[%s]", parentPath);
                      }
                      break;
                    case CONNECTION_RECONNECTED:
                      final Set<String> thePathsLost = pathsLost.getAndSet(null);

                      if (thePathsLost != null) {
                        for (String path : thePathsLost) {
                          log.info("Reinstating [%s]", path);
                          final ZKPaths.PathAndNode split = ZKPaths.getPathAndNode(path);
                          createAnnouncement(path, announcements.get(split.getPath()).get(split.getNode()));
                        }
                      }
                      break;
                    case CHILD_ADDED:
                      if (addedChildren != null) {
                        addedChildren.add(event.getData().getPath());
                      }
                      // fall through
                    case INITIALIZED:
                    case CHILD_UPDATED:
                    case CONNECTION_SUSPENDED:
                      // do nothing
                  }
                }
              }
          );

          synchronized (toAnnounce) {
            if (started) {
              if (buildParentPath) {
                createPath(parentPath, removeParentIfCreated);
              }
              startCache(cache);
              listeners.put(parentPath, cache);
            }
          }
        }
      }

      subPaths = finalSubPaths;
    }

    boolean created = false;
    synchronized (toAnnounce) {
      if (started) {
        byte[] oldBytes = subPaths.putIfAbsent(pathAndNode.getNode(), bytes);

        if (oldBytes == null) {
          created = true;
        } else if (!Arrays.equals(oldBytes, bytes)) {
          throw new IAE("Cannot reannounce different values under the same path");
        }
      }
    }

    if (created) {
      try {
        createAnnouncement(path, bytes);
      }
      catch (Exception e) {
        throw new RuntimeException(e);
      }
    }
  }