in uReplicator-Common/src/main/java/com/uber/stream/kafka/mirrormaker/common/core/WorkloadInfoRetriever.java [209:237]
private void retrieveWorkload(long timeInMs, long windowInMs, Map<String, Integer> topicsPartitions)
throws IOException {
long current = System.currentTimeMillis();
Map<String, TopicWorkload> topicWorkloads = C3QueryUtils.retrieveTopicInRate(timeInMs, windowInMs,
_c3Host, _c3Port, _srcKafkaCluster, new ArrayList<>(topicsPartitions.keySet()));
LOGGER.info("Retrieved workload for ts: {} for srcKafkaCluster: {} and {} topics", timeInMs, _srcKafkaCluster, topicsPartitions.size());
synchronized (_topicWorkloadMap) {
for (Map.Entry<String, TopicWorkload> entry : topicWorkloads.entrySet()) {
String topic = entry.getKey();
TopicWorkload workload = entry.getValue();
Integer partitions = topicsPartitions.get(topic);
if (partitions != null) {
workload.setParitions(partitions);
LinkedList<TopicWorkload> tws = _topicWorkloadMap.get(topic);
if (tws == null) {
tws = new LinkedList<>();
_topicWorkloadMap.put(topic, tws);
}
if (tws.isEmpty() || tws.getLast().getLastUpdate() < workload.getLastUpdate()) {
tws.add(workload);
}
// purge the data points out of the valid window
while (!tws.isEmpty() && (current - tws.getFirst().getLastUpdate() > _maxValidTimeMillis)) {
tws.removeFirst();
}
}
}
}
}