void updateJobStatuses()

in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/controller/rpc/ControllerWorkerService.java [280:362]


  void updateJobStatuses(List<JobStatus> jobStatusList, long workerId) {
    Instrumentation.instrument.returnVoidCatchThrowable(
        logger,
        infra.scope(),
        infra.tracer(),
        () -> {
          Timestamp timestamp = TimestampUtils.currentTimeMilliseconds();
          // Job status updates are done in parallel using Java parallel stream
          jobStatusList
              .parallelStream()
              .forEach(
                  jobStatus -> {
                    KafkaConsumerTask task = jobStatus.getJob().getKafkaConsumerTask();
                    KafkaConsumerTaskStatus taskStatus = jobStatus.getKafkaConsumerTaskStatus();
                    jobThroughputSink.consume(
                        jobStatus.getJob(),
                        taskStatus.getMessagesPerSec(),
                        taskStatus.getBytesPerSec());
                    StoredJobStatus storedJobStatus =
                        StoredJobStatus.newBuilder()
                            .setLastUpdated(timestamp)
                            .setJobStatus(jobStatus)
                            .setWorkerId(workerId)
                            .build();
                    infra
                        .scope()
                        .tagged(
                            ImmutableMap.of(
                                StructuredLogging.KAFKA_GROUP, task.getConsumerGroup(),
                                StructuredLogging.KAFKA_TOPIC, task.getTopic(),
                                StructuredLogging.KAFKA_PARTITION,
                                    Integer.toString(task.getPartition())))
                        .gauge(StructuredLogging.COMMITTED_OFFSET)
                        .update(taskStatus.getCommitOffset());
                    try {
                      // try to get the existing job status
                      Versioned<StoredJobStatus> oldStoredJobStatus =
                          jobStatusStore.get(jobStatus.getJob().getJobId());
                      StoredJobStatus oldJobStatusModel = oldStoredJobStatus.model();
                      // Check if the job status has been changed
                      boolean jobStatusNotChanged =
                          Objects.nonNull(oldJobStatusModel)
                              && jobStatus.equals(oldJobStatusModel.getJobStatus())
                              && workerId == oldJobStatusModel.getWorkerId();
                      boolean jobStatusChanged = !jobStatusNotChanged;
                      try {
                        // try to put job status to zk only if the job status is changed
                        if (jobStatusChanged) {
                          jobStatusStore.put(
                              jobStatus.getJob().getJobId(),
                              Versioned.from(storedJobStatus, oldStoredJobStatus.version()));
                        }
                      } catch (Exception e) {
                        logger.warn(
                            "failed to put stored jobStatus to zk",
                            StructuredLogging.jobId(jobStatus.getJob().getJobId()),
                            StructuredLogging.kafkaGroup(task.getConsumerGroup()),
                            StructuredLogging.kafkaTopic(task.getTopic()),
                            StructuredLogging.kafkaPartition(task.getPartition()),
                            e);
                      }
                    } catch (Exception e) {
                      logger.debug(
                          "no stored jobStatus before",
                          StructuredLogging.jobId(jobStatus.getJob().getJobId()),
                          e);
                      try {
                        // if no stored job status, try to create one
                        jobStatusStore.create(storedJobStatus, (key, value) -> value);
                      } catch (Exception ex) {
                        logger.error(
                            "failed to create stored jobStatus",
                            StructuredLogging.jobId(jobStatus.getJob().getJobId()),
                            StructuredLogging.kafkaGroup(task.getConsumerGroup()),
                            StructuredLogging.kafkaTopic(task.getTopic()),
                            StructuredLogging.kafkaPartition(task.getPartition()),
                            ex);
                      }
                    }
                  });
        },
        "masterworkerservice.updatejobstatuses");
  }