public void doWork()

in uReplicator-Worker/src/main/java/com/uber/stream/ureplicator/worker/ConsumerFetcherThread.java [92:132]


  public void doWork() {
    synchronized (partitionMapLock) {
      try {
        ConsumerRecords records = null;
        boolean topicChanged = refreshPartitionMap();
        if (topicChanged) {
          LOGGER.info("[{}]Assignment changed, partitionMap {}, partitionResetOffsetMap: {} ",
              getName(),
              partitionMap.keySet(),
              partitionResetOffsetMap);
          kafkaConsumer.assign(partitionMap.keySet());
          for (Map.Entry<TopicPartition, Long> entry : partitionResetOffsetMap.entrySet()) {
            TopicPartition tp = entry.getKey();
            long offset = entry.getValue();
            if (offset >= 0) {
              kafkaConsumer.seek(tp, offset);
            }
          }
          partitionResetOffsetMap.clear();
        }
        if (partitionMap.size() != 0) {
          records = kafkaConsumer.poll(pollTimeoutMs);
        }
        if (partitionMap.size() == 0 || records == null || records.isEmpty()) {
          partitionMapLock.wait(fetchBackOffMs);
          return;
        }

        processFetchedData(records);
        logTopicPartitionInfo();
      } catch (Throwable e) {
        LOGGER.error("[{}]: Catch Throwable exception: {}", getName(), e.getMessage(), e);
        // reset offset to fetchoffset to avoid data loss
        for (PartitionOffsetInfo offsetInfo : partitionMap.values()) {
          if (offsetInfo.fetchOffset() >= 0) {
            kafkaConsumer.seek(offsetInfo.topicPartition(), offsetInfo.fetchOffset());
          }
        }
      }
    }
  }