public Supplier load()

in twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java [223:279]


      public Supplier<T> load(final K key) throws Exception {
        // A future to tell if the result is ready, even it is failure.
        final SettableFuture<T> readyFuture = SettableFuture.create();
        final AtomicReference<T> resultValue = new AtomicReference<T>();

        // Fetch for node data when it exists.
        final String path = key.getPath();
        actOnExists(path, new Runnable() {
          @Override
          public void run() {
            // Callback for getData call
            final FutureCallback<NodeData> dataCallback = new FutureCallback<NodeData>() {
              @Override
              public void onSuccess(NodeData result) {
                // Update with latest data
                T value = decodeNodeData(result, resultType);
                resultValue.set(value);
                readyFuture.set(value);
              }

              @Override
              public void onFailure(Throwable t) {
                LOG.error("Failed to fetch node data on {}", path, t);
                if (t instanceof KeeperException.NoNodeException) {
                  resultValue.set(null);
                  readyFuture.set(null);
                  return;
                }

                // On error, simply invalidate the key so that it'll be fetched next time.
                invalidater.invalidate(key);
                readyFuture.setException(t);
              }
            };

            // Fetch node data
            Futures.addCallback(zkClient.getData(path, new Watcher() {
              @Override
              public void process(WatchedEvent event) {
                if (!isRunning()) {
                  return;
                }
                if (event.getType() == Event.EventType.NodeDataChanged) {
                  // If node data changed, fetch it again.
                  Futures.addCallback(zkClient.getData(path, this), dataCallback, executorService);
                } else if (event.getType() == Event.EventType.NodeDeleted) {
                  // If node removed, invalidate the cached value.
                  brokerInfos.invalidate(key);
                }
              }
            }), dataCallback, executorService);
          }
        }, readyFuture, FAILURE_RETRY_SECONDS, TimeUnit.SECONDS);

        readyFuture.get();
        return createSupplier(resultValue);
      }