in uReplicator-Worker/src/main/java/com/uber/stream/ureplicator/worker/FetcherManager.java [273:320]
public void doWork() {
synchronized (updateMapLock) {
try {
if (partitionAddMap.size() != 0) {
Map<TopicPartition, PartitionOffsetInfo> newPartitionMap = addTopicPartitionsToFetcherThread(
partitionAddMap);
for (Map.Entry<TopicPartition, PartitionOffsetInfo> entry : newPartitionMap.entrySet()) {
partitionMap.put(entry.getKey(), entry.getValue());
partitionAddMap.remove(entry.getKey());
}
}
if (partitionAddMap.size() != 0) {
LOGGER.info("topic partitions not successfully add to fetcher thread: {}",
partitionAddMap.keySet());
}
// FIXME: propagate topic partition change to controller
// Remove topic partition already finished
for (PartitionOffsetInfo partition : partitionMap.values()) {
if (partition.consumedEndBounded()) {
partitionDeleteMap.put(partition.topicPartition(), true);
}
}
for (TopicPartition partition : partitionDeleteMap.keySet()) {
if (partitionMap.containsKey(partition)) {
partitionMap.remove(partition);
partitionThreadMap.remove(partition);
}
}
if (partitionDeleteMap.size() != 0) {
removeFetcherForPartitions(partitionDeleteMap.keySet());
}
partitionDeleteMap.clear();
} catch (Throwable t) {
LOGGER.error("[{}]: Catch Throwable exception: {}", getName(), t.getMessage(), t);
}
}
try {
Thread.sleep(refreshBackoff);
} catch (InterruptedException e) {
LOGGER.error("[{}]InterruptedException on sleep", getName());
}
}