private void expandOrShrink()

in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/common/KafkaPartitionExpansionWatcher.java [166:247]


  private void expandOrShrink(
      Versioned<StoredJobGroup> versionedJobGroup, List<TopicPartitionInfo> partitionInfos) {
    Instrumentation.instrument.returnVoidCatchThrowable(
        logger,
        infra.scope(),
        infra.tracer(),
        () -> {
          boolean isChanged = false;
          StoredJobGroup jobGroup = versionedJobGroup.model();

          // Verify that job_key is [0, expectedJobCount)
          // Removing any jobs that are not valid
          Map<Integer, StoredJob> currentJobs =
              jobGroup
                  .getJobsList()
                  .stream()
                  .collect(Collectors.toMap(JobUtils::getJobKey, v -> v));

          Map<Integer, StoredJob> newJobs = new HashMap<>();
          for (int i = 0; i < partitionInfos.size(); i++) {
            String jobPod = getJobPod(partitionInfos.get(i));
            if (currentJobs.containsKey(i)) {
              StoredJob oldJob = currentJobs.get(i);
              Preconditions.checkNotNull(
                  oldJob, "oldJob should not be null since we just checked map contains it");
              if (!jobPod.equals(oldJob.getJobPod())) {
                // update StoredJob if jobPod is changed, this will be persisted into job store
                StoredJob newJob = StoredJob.newBuilder(oldJob).setJobPod(jobPod).build();
                newJobs.put(i, newJob);
                isChanged = true;
              } else {
                newJobs.put(i, oldJob);
              }
              // remove old job from currentJobs map so that we can later check
              // currentJobs map for any jobs that remain. These jobs are removed so we should be
              // aware
              // of this.
              currentJobs.remove(i);
              continue;
            }
            // Else, no jobs exist for this partition so we must add it.
            // initialize the job state to match the job group state
            StoredJob newJob =
                jobCreator.newJob(
                    jobGroup,
                    jobIdProvider.getId(
                        StoredJob.newBuilder()
                            .setJob(JobUtils.newJob(jobGroup.getJobGroup()))
                            .setJobPod(jobPod)
                            .build()),
                    i);

            newJobs.put(i, newJob);
            isChanged = true;
          }

          // if any jobs remain in current jobs, these are extra partitions so we must remove it.
          if (currentJobs.size() > 0) {
            isChanged = true;
          }

          // skip ZK write if no change
          if (!isChanged) {
            return;
          }

          StoredJobGroup.Builder newJobGroupBuilder = StoredJobGroup.newBuilder(jobGroup);
          newJobGroupBuilder.clearJobs();
          newJobGroupBuilder.addAllJobs(newJobs.values());
          StoredJobGroup newJobGroup = newJobGroupBuilder.build();
          jobGroupStore.put(
              newJobGroup.getJobGroup().getJobGroupId(),
              VersionedProto.from(newJobGroup, versionedJobGroup.version()));
        },
        "kafka-partition-expansion-watcher.expand-or-shrink",
        StructuredFields.KAFKA_CLUSTER,
        versionedJobGroup.model().getJobGroup().getKafkaConsumerTaskGroup().getCluster(),
        StructuredFields.KAFKA_TOPIC,
        versionedJobGroup.model().getJobGroup().getKafkaConsumerTaskGroup().getTopic(),
        StructuredFields.KAFKA_GROUP,
        versionedJobGroup.model().getJobGroup().getKafkaConsumerTaskGroup().getConsumerGroup());
  }