in uReplicator-Common/src/main/java/com/uber/stream/kafka/mirrormaker/common/core/WorkloadInfoRetriever.java [145:168]
public TopicWorkload topicWorkload(String topic) {
LinkedList<TopicWorkload> tws = _topicWorkloadMap.get(topic);
if (tws == null || tws.isEmpty()) {
return _defaultTopicWorkload;
}
// return the maximum bytes-in-rate during the valid window
TopicWorkload maxTw = null;
long current = System.currentTimeMillis();
long lookbackWindow = _maxValidTimeMillis;
if (tws.stream().anyMatch(topicWorkload -> {
return current - topicWorkload.getLastUpdate() < _estimationLookBackWindow;
})) {
lookbackWindow = _estimationLookBackWindow;
}
for (TopicWorkload tw : tws) {
if (current - tw.getLastUpdate() > lookbackWindow) {
continue;
}
if (maxTw == null || maxTw.getBytesPerSecond() < tw.getBytesPerSecond()) {
maxTw = tw;
}
}
return (maxTw != null) ? maxTw : _defaultTopicWorkload;
}