public Map getSystemStreamPartitionCounts()

in samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java [218:284]


  public Map<String, SystemStreamMetadata> getSystemStreamPartitionCounts(Set<String> streamNames, long cacheTTL) {
    // This optimization omits actual metadata for performance. Instead, we inject a dummy for all partitions.
    final SystemStreamMetadata.SystemStreamPartitionMetadata dummySspm =
      new SystemStreamMetadata.SystemStreamPartitionMetadata(null, null, null) {
        String msg =
            "getSystemStreamPartitionCounts does not populate SystemStreaMetadata info. Only number of partitions";

        @Override
        public String getOldestOffset() {
          throw new NotImplementedException(msg);
        }

        @Override
        public String getNewestOffset() {
          throw new NotImplementedException(msg);
        }

        @Override
        public String getUpcomingOffset() {
          throw new NotImplementedException(msg);
        }
      };

    ExponentialSleepStrategy strategy = new ExponentialSleepStrategy(DEFAULT_EXPONENTIAL_SLEEP_BACK_OFF_MULTIPLIER,
        DEFAULT_EXPONENTIAL_SLEEP_INITIAL_DELAY_MS, DEFAULT_EXPONENTIAL_SLEEP_MAX_DELAY_MS);

    Function1<ExponentialSleepStrategy.RetryLoop, Map<String, SystemStreamMetadata>> fetchMetadataOperation =
      new AbstractFunction1<ExponentialSleepStrategy.RetryLoop, Map<String, SystemStreamMetadata>>() {
        @Override
        public Map<String, SystemStreamMetadata> apply(ExponentialSleepStrategy.RetryLoop loop) {
          Map<String, SystemStreamMetadata> allMetadata = new HashMap<>();

          streamNames.forEach(streamName -> {
            Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = new HashMap<>();

            List<PartitionInfo> partitionInfos = threadSafeKafkaConsumer.execute(consumer -> consumer.partitionsFor(streamName));
            LOG.debug("Stream {} has partitions {}", streamName, partitionInfos);
            partitionInfos.forEach(
              partitionInfo -> partitionMetadata.put(new Partition(partitionInfo.partition()), dummySspm));
            allMetadata.put(streamName, new SystemStreamMetadata(streamName, partitionMetadata));
          });

          loop.done();
          return allMetadata;
        }
      };

    Map<String, SystemStreamMetadata> result = strategy.run(fetchMetadataOperation,
        new AbstractFunction2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit>() {
          @Override
          public BoxedUnit apply(Exception exception, ExponentialSleepStrategy.RetryLoop loop) {
            if (loop.sleepCount() < MAX_RETRIES_ON_EXCEPTION) {
              LOG.warn(String.format("Fetching systemstreampartition counts for: %s threw an exception. Retrying.",
                  streamNames), exception);
            } else {
              LOG.error(String.format("Fetching systemstreampartition counts for: %s threw an exception.", streamNames),
                  exception);
              loop.done();
              throw new SamzaException(exception);
            }
            return null;
          }
        }).get();

    LOG.info("SystemStream partition counts for system {}: {}", systemName, result);
    return result;
  }