in uReplicator-Worker/src/main/java/com/uber/stream/ureplicator/worker/FetcherManager.java [161:200]
protected void addFetcherForTopicPartition(
TopicPartition tp, PartitionOffsetInfo offsetInfo, String fetcherThreadName) {
LOGGER.info("Enter add fetcher thread for partitions {}, fetcherThread {}", tp,
fetcherThreadName);
synchronized (fetcherMapLock) {
if (StringUtils.isBlank(fetcherThreadName)) {
LOGGER.warn("Unexpected behavior, can't find threadName for topic partition {}", tp);
return;
}
ConsumerFetcherThread fetcherThread = fetcherThreadMap
.getOrDefault(fetcherThreadName, null);
if (fetcherThread == null) {
try {
int queueIndex = Math.abs(fetcherThreadName.hashCode() % messageQueue.size());
LOGGER.info("Creating fetcher thread {}", fetcherThreadName);
CustomizedConsumerConfig cloned = (CustomizedConsumerConfig) consumerProperties.clone();
String clientIdPrefix = consumerProperties
.getProperty(ConsumerConfig.CLIENT_ID_CONFIG, "ureplicator");
int fetcherThreadId = fetcherId.incrementAndGet();
// Kafka Consumer doesn't support having two kafka consumer using the same client id,
// It throws error: WARN Error registering AppInfo mbean
cloned.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,
String.format("%s-%d", clientIdPrefix, fetcherThreadId));
fetcherThread = createConsumerFetcherThread(fetcherThreadName, cloned,
messageLimiter, messageQueue.get(queueIndex));
fetcherThread.start();
fetcherThreadMap.put(fetcherThreadName, fetcherThread);
LOGGER.info("Fetcher fetcher thread {} created", fetcherThreadName);
} catch (Exception e) {
LOGGER.error("Failed to create new fetcher thread {}", getName(), e);
return;
}
}
fetcherThread.addPartitions(ImmutableMap.of(tp, offsetInfo));
partitionThreadMap.putIfAbsent(tp, fetcherThreadName);
}
LOGGER.info("Add fetcher thread for partitions {} finished", tp);
}