boolean extractTopicPartitionMap()

in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/fetchers/kafka/AbstractKafkaFetcherThread.java [605:692]


  boolean extractTopicPartitionMap(
      Map<TopicPartition, Job> newTopicPartitionJobMap,
      Map<TopicPartition, Job> removedTopicPartitionJobMap,
      Map<TopicPartition, Job> allTopicPartitionJobMap) {
    Preconditions.checkNotNull(pipelineStateManager, "pipeline config manager required");
    // Use a snapshot of the expectedRunningJobMap as it might change.
    Map<Long, Job> expectedRunningJobMap = pipelineStateManager.getExpectedRunningJobMap();
    // step 1: create an old topic partition set and an old job set from previousJobRunningMap.
    Map<Long, Job> oldJobs = ImmutableMap.copyOf(currentRunningJobMap);
    Set<TopicPartition> oldTPs = new HashSet<>();
    currentRunningJobMap.forEach(
        (jobId, job) -> {
          TopicPartition topicPartition =
              new TopicPartition(
                  job.getKafkaConsumerTask().getTopic(), job.getKafkaConsumerTask().getPartition());
          oldTPs.add(topicPartition);
        });
    currentRunningJobMap.clear();
    currentRunningJobMap.putAll(expectedRunningJobMap);

    // step 2: create allTopicPartitionJobMap from current running jobSet
    expectedRunningJobMap.forEach(
        (jobId, job) -> {
          // TODO (T4367183): currently, if more than one jobs have the same TopicPartition,
          //  randomly pick one. Need to think about how to merge jobs correctly carefully
          TopicPartition topicPartition =
              new TopicPartition(
                  job.getKafkaConsumerTask().getTopic(), job.getKafkaConsumerTask().getPartition());
          if (allTopicPartitionJobMap.containsKey(topicPartition)) {
            LOGGER.error(
                "two jobs have the same topic partition",
                StructuredLogging.kafkaCluster(job.getKafkaConsumerTask().getCluster()),
                StructuredLogging.kafkaGroup(job.getKafkaConsumerTask().getConsumerGroup()),
                StructuredLogging.kafkaTopic(job.getKafkaConsumerTask().getTopic()),
                StructuredLogging.kafkaPartition(job.getKafkaConsumerTask().getPartition()),
                StructuredLogging.reason(
                    "two job ids: "
                        + job.getJobId()
                        + ", "
                        + allTopicPartitionJobMap.get(topicPartition).getJobId()));
            scope
                .tagged(
                    StructuredTags.builder()
                        .setKafkaGroup(job.getKafkaConsumerTask().getConsumerGroup())
                        .setKafkaTopic(job.getKafkaConsumerTask().getTopic())
                        .setKafkaPartition(job.getKafkaConsumerTask().getPartition())
                        .build())
                .gauge(MetricNames.TOPIC_PARTITION_DUP_JOB)
                .update(1.0);
          } else {
            allTopicPartitionJobMap.put(topicPartition, job);
          }
        });

    // step 3: check for new topic partitions
    for (Map.Entry<TopicPartition, Job> entry : allTopicPartitionJobMap.entrySet()) {
      // do not need to re-seek offset for existing topic-partitions
      if (!oldTPs.contains(entry.getKey())) {
        newTopicPartitionJobMap.put(entry.getKey(), entry.getValue());
      }
    }

    List<Job> runJobs = new ArrayList<>();
    for (Map.Entry<Long, Job> jobEntry : expectedRunningJobMap.entrySet()) {
      if (!oldJobs.containsKey(jobEntry.getKey())) {
        runJobs.add(jobEntry.getValue());
      }
    }

    // step 4: add cancel jobs to removedTopicPartitionJobMap
    List<Job> cancelJobs = new ArrayList<>();
    for (Map.Entry<Long, Job> jobEntry : oldJobs.entrySet()) {
      if (!expectedRunningJobMap.containsKey(jobEntry.getKey())) {
        Job cancelJob = jobEntry.getValue();
        cancelJobs.add(cancelJob);
        removedTopicPartitionJobMap.put(
            new TopicPartition(
                cancelJob.getKafkaConsumerTask().getTopic(),
                cancelJob.getKafkaConsumerTask().getPartition()),
            cancelJob);
      }
    }

    logCommands(runJobs, cancelJobs);

    // step 5: Check for job assignment changes.
    return !runJobs.isEmpty() || !cancelJobs.isEmpty();
  }