boolean extractTopicPartitionMap()

in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/fetchers/kafka/AbstractKafkaFetcherThread.java [494:576]


  boolean extractTopicPartitionMap(
      Map<TopicPartition, Job> newTopicPartitionJobMap,
      Map<TopicPartition, Job> removedTopicPartitionJobMap,
      Map<TopicPartition, Job> allTopicPartitionJobMap) {
    Preconditions.checkNotNull(pipelineStateManager, "pipeline config manager required");
    // Use a snapshot of the expectedRunningJobMap as it might change.
    Map<Long, Job> expectedRunningJobMap = pipelineStateManager.getExpectedRunningJobMap();
    // step 1: create an old topic partition set and an old job set from previousJobRunningMap.
    Set<Job> oldJobs = ImmutableSet.copyOf(currentRunningJobMap.values());
    Set<TopicPartition> oldTPs = new HashSet<>();
    currentRunningJobMap.forEach(
        (jobId, job) -> {
          TopicPartition topicPartition =
              new TopicPartition(
                  job.getKafkaConsumerTask().getTopic(), job.getKafkaConsumerTask().getPartition());
          oldTPs.add(topicPartition);
        });
    currentRunningJobMap.clear();
    currentRunningJobMap.putAll(expectedRunningJobMap);

    // step 2: create allTopicPartitionJobMap from current running jobSet
    expectedRunningJobMap.forEach(
        (jobId, job) -> {
          // TODO (T4367183): currently, if more than one jobs have the same TopicPartition,
          //  randomly pick one. Need to think about how to merge jobs correctly carefully
          TopicPartition topicPartition =
              new TopicPartition(
                  job.getKafkaConsumerTask().getTopic(), job.getKafkaConsumerTask().getPartition());
          if (allTopicPartitionJobMap.containsKey(topicPartition)) {
            LOGGER.error(
                "two jobs have the same topic partition",
                StructuredLogging.kafkaCluster(job.getKafkaConsumerTask().getCluster()),
                StructuredLogging.kafkaGroup(job.getKafkaConsumerTask().getConsumerGroup()),
                StructuredLogging.kafkaTopic(job.getKafkaConsumerTask().getTopic()),
                StructuredLogging.kafkaPartition(job.getKafkaConsumerTask().getPartition()),
                StructuredLogging.reason(
                    "two job ids: "
                        + job.getJobId()
                        + ", "
                        + allTopicPartitionJobMap.get(topicPartition).getJobId()));
            scope
                .tagged(
                    StructuredTags.builder()
                        .setKafkaGroup(job.getKafkaConsumerTask().getConsumerGroup())
                        .setKafkaTopic(job.getKafkaConsumerTask().getTopic())
                        .setKafkaPartition(job.getKafkaConsumerTask().getPartition())
                        .build())
                .gauge(MetricNames.TOPIC_PARTITION_DUP_JOB)
                .update(1.0);
          } else {
            allTopicPartitionJobMap.put(topicPartition, job);
          }
        });

    // step 3: check for new topic partitions
    for (Map.Entry<TopicPartition, Job> entry : allTopicPartitionJobMap.entrySet()) {
      // do not need to re-seek offset for existing topic-partitions
      if (!oldTPs.contains(entry.getKey())) {
        newTopicPartitionJobMap.put(entry.getKey(), entry.getValue());
      }
    }

    Set<Job> newJobs = ImmutableSet.copyOf(expectedRunningJobMap.values());

    Sets.SetView<Job> cancelJobs = Sets.difference(oldJobs, newJobs);
    Sets.SetView<Job> runJobs = Sets.difference(newJobs, oldJobs);

    // step 4: add cancel jobs to removedTopicPartitionJobMap
    cancelJobs
        .iterator()
        .forEachRemaining(
            job ->
                removedTopicPartitionJobMap.put(
                    new TopicPartition(
                        job.getKafkaConsumerTask().getTopic(),
                        job.getKafkaConsumerTask().getPartition()),
                    job));

    logCommands(runJobs, cancelJobs);

    // step 5: Check for job assignment changes.
    return !runJobs.isEmpty() || !cancelJobs.isEmpty();
  }