private CacheLoader createServiceLoader()

in twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java [322:363]


  private CacheLoader<String, ServiceDiscovered> createServiceLoader() {
    return new CacheLoader<String, ServiceDiscovered>() {
      @Override
      public ServiceDiscovered load(String service) throws Exception {
        final DefaultServiceDiscovered serviceDiscovered = new DefaultServiceDiscovered(service);
        final String serviceBase = "/" + service;

        // Watch for children changes in /service
        ZKOperations.watchChildren(zkClient, serviceBase, new ZKOperations.ChildrenCallback() {
          @Override
          public void updated(NodeChildren nodeChildren) {
            // Fetch data of all children nodes in parallel.
            List<String> children = nodeChildren.getChildren();
            List<OperationFuture<NodeData>> dataFutures = Lists.newArrayListWithCapacity(children.size());
            for (String child : children) {
              dataFutures.add(zkClient.getData(serviceBase + "/" + child));
            }

            // Update the service map when all fetching are done.
            final ListenableFuture<List<NodeData>> fetchFuture = Futures.successfulAsList(dataFutures);
            fetchFuture.addListener(new Runnable() {
              @Override
              public void run() {
                ImmutableSet.Builder<Discoverable> builder = ImmutableSet.builder();
                for (NodeData nodeData : Futures.getUnchecked(fetchFuture)) {
                  // For successful fetch, decode the content.
                  if (nodeData != null) {
                    Discoverable discoverable = DiscoverableAdapter.decode(nodeData.getData());
                    if (discoverable != null) {
                      builder.add(discoverable);
                    }
                  }
                }
                serviceDiscovered.setDiscoverables(builder.build());
              }
            }, Threads.SAME_THREAD_EXECUTOR);
          }
        });
        return serviceDiscovered;
      }
    };
  }