public void doWork()

in uReplicator-Worker/src/main/java/com/uber/stream/ureplicator/worker/FetcherManager.java [273:320]


  public void doWork() {
    synchronized (updateMapLock) {
      try {
        if (partitionAddMap.size() != 0) {
          Map<TopicPartition, PartitionOffsetInfo> newPartitionMap = addTopicPartitionsToFetcherThread(
              partitionAddMap);

          for (Map.Entry<TopicPartition, PartitionOffsetInfo> entry : newPartitionMap.entrySet()) {
            partitionMap.put(entry.getKey(), entry.getValue());
            partitionAddMap.remove(entry.getKey());
          }
        }

        if (partitionAddMap.size() != 0) {
          LOGGER.info("topic partitions not successfully add to fetcher thread: {}",
              partitionAddMap.keySet());
        }

        // FIXME: propagate topic partition change to controller
        // Remove topic partition already finished
        for (PartitionOffsetInfo partition : partitionMap.values()) {
          if (partition.consumedEndBounded()) {
            partitionDeleteMap.put(partition.topicPartition(), true);
          }
        }

        for (TopicPartition partition : partitionDeleteMap.keySet()) {
          if (partitionMap.containsKey(partition)) {
            partitionMap.remove(partition);
            partitionThreadMap.remove(partition);
          }
        }

        if (partitionDeleteMap.size() != 0) {
          removeFetcherForPartitions(partitionDeleteMap.keySet());
        }
        partitionDeleteMap.clear();

      } catch (Throwable t) {
        LOGGER.error("[{}]: Catch Throwable exception: {}", getName(), t.getMessage(), t);
      }
    }
    try {
      Thread.sleep(refreshBackoff);
    } catch (InterruptedException e) {
      LOGGER.error("[{}]InterruptedException on sleep", getName());
    }
  }