private void retrieveWorkload()

in uReplicator-Common/src/main/java/com/uber/stream/kafka/mirrormaker/common/core/WorkloadInfoRetriever.java [209:237]


  private void retrieveWorkload(long timeInMs, long windowInMs, Map<String, Integer> topicsPartitions)
      throws IOException {
    long current = System.currentTimeMillis();
    Map<String, TopicWorkload> topicWorkloads = C3QueryUtils.retrieveTopicInRate(timeInMs, windowInMs,
        _c3Host, _c3Port, _srcKafkaCluster, new ArrayList<>(topicsPartitions.keySet()));
    LOGGER.info("Retrieved workload for ts: {} for srcKafkaCluster: {} and {} topics", timeInMs, _srcKafkaCluster, topicsPartitions.size());
    synchronized (_topicWorkloadMap) {
      for (Map.Entry<String, TopicWorkload> entry : topicWorkloads.entrySet()) {
        String topic = entry.getKey();
        TopicWorkload workload = entry.getValue();
        Integer partitions = topicsPartitions.get(topic);
        if (partitions != null) {
          workload.setParitions(partitions);
          LinkedList<TopicWorkload> tws = _topicWorkloadMap.get(topic);
          if (tws == null) {
            tws = new LinkedList<>();
            _topicWorkloadMap.put(topic, tws);
          }
          if (tws.isEmpty() || tws.getLast().getLastUpdate() < workload.getLastUpdate()) {
            tws.add(workload);
          }
          // purge the data points out of the valid window
          while (!tws.isEmpty() && (current - tws.getFirst().getLastUpdate() > _maxValidTimeMillis)) {
            tws.removeFirst();
          }
        }
      }
    }
  }