in uReplicator-Worker/src/main/java/com/uber/stream/ureplicator/worker/ConsumerFetcherThread.java [92:132]
public void doWork() {
synchronized (partitionMapLock) {
try {
ConsumerRecords records = null;
boolean topicChanged = refreshPartitionMap();
if (topicChanged) {
LOGGER.info("[{}]Assignment changed, partitionMap {}, partitionResetOffsetMap: {} ",
getName(),
partitionMap.keySet(),
partitionResetOffsetMap);
kafkaConsumer.assign(partitionMap.keySet());
for (Map.Entry<TopicPartition, Long> entry : partitionResetOffsetMap.entrySet()) {
TopicPartition tp = entry.getKey();
long offset = entry.getValue();
if (offset >= 0) {
kafkaConsumer.seek(tp, offset);
}
}
partitionResetOffsetMap.clear();
}
if (partitionMap.size() != 0) {
records = kafkaConsumer.poll(pollTimeoutMs);
}
if (partitionMap.size() == 0 || records == null || records.isEmpty()) {
partitionMapLock.wait(fetchBackOffMs);
return;
}
processFetchedData(records);
logTopicPartitionInfo();
} catch (Throwable e) {
LOGGER.error("[{}]: Catch Throwable exception: {}", getName(), e.getMessage(), e);
// reset offset to fetchoffset to avoid data loss
for (PartitionOffsetInfo offsetInfo : partitionMap.values()) {
if (offsetInfo.fetchOffset() >= 0) {
kafkaConsumer.seek(offsetInfo.topicPartition(), offsetInfo.fetchOffset());
}
}
}
}
}