protected void addFetcherForTopicPartition()

in uReplicator-Worker/src/main/java/com/uber/stream/ureplicator/worker/FetcherManager.java [161:200]


  protected void addFetcherForTopicPartition(
      TopicPartition tp, PartitionOffsetInfo offsetInfo, String fetcherThreadName) {
    LOGGER.info("Enter add fetcher thread for partitions {}, fetcherThread {}", tp,
        fetcherThreadName);
    synchronized (fetcherMapLock) {
      if (StringUtils.isBlank(fetcherThreadName)) {
        LOGGER.warn("Unexpected behavior, can't find threadName for topic partition {}", tp);
        return;
      }

      ConsumerFetcherThread fetcherThread = fetcherThreadMap
          .getOrDefault(fetcherThreadName, null);
      if (fetcherThread == null) {
        try {
          int queueIndex = Math.abs(fetcherThreadName.hashCode() % messageQueue.size());

          LOGGER.info("Creating fetcher thread {}", fetcherThreadName);
          CustomizedConsumerConfig cloned = (CustomizedConsumerConfig) consumerProperties.clone();
          String clientIdPrefix = consumerProperties
              .getProperty(ConsumerConfig.CLIENT_ID_CONFIG, "ureplicator");
          int fetcherThreadId = fetcherId.incrementAndGet();
          // Kafka Consumer doesn't support having two kafka consumer using the same client id,
          // It throws error: WARN Error registering AppInfo mbean
          cloned.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,
              String.format("%s-%d", clientIdPrefix, fetcherThreadId));
          fetcherThread = createConsumerFetcherThread(fetcherThreadName, cloned,
              messageLimiter, messageQueue.get(queueIndex));
          fetcherThread.start();
          fetcherThreadMap.put(fetcherThreadName, fetcherThread);
          LOGGER.info("Fetcher fetcher thread {} created", fetcherThreadName);
        } catch (Exception e) {
          LOGGER.error("Failed to create new fetcher thread {}", getName(), e);
          return;
        }
      }
      fetcherThread.addPartitions(ImmutableMap.of(tp, offsetInfo));
      partitionThreadMap.putIfAbsent(tp, fetcherThreadName);
    }
    LOGGER.info("Add fetcher thread for partitions {} finished", tp);
  }