in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/controller/rpc/ControllerWorkerService.java [280:362]
void updateJobStatuses(List<JobStatus> jobStatusList, long workerId) {
Instrumentation.instrument.returnVoidCatchThrowable(
logger,
infra.scope(),
infra.tracer(),
() -> {
Timestamp timestamp = TimestampUtils.currentTimeMilliseconds();
// Job status updates are done in parallel using Java parallel stream
jobStatusList
.parallelStream()
.forEach(
jobStatus -> {
KafkaConsumerTask task = jobStatus.getJob().getKafkaConsumerTask();
KafkaConsumerTaskStatus taskStatus = jobStatus.getKafkaConsumerTaskStatus();
jobThroughputSink.consume(
jobStatus.getJob(),
taskStatus.getMessagesPerSec(),
taskStatus.getBytesPerSec());
StoredJobStatus storedJobStatus =
StoredJobStatus.newBuilder()
.setLastUpdated(timestamp)
.setJobStatus(jobStatus)
.setWorkerId(workerId)
.build();
infra
.scope()
.tagged(
ImmutableMap.of(
StructuredLogging.KAFKA_GROUP, task.getConsumerGroup(),
StructuredLogging.KAFKA_TOPIC, task.getTopic(),
StructuredLogging.KAFKA_PARTITION,
Integer.toString(task.getPartition())))
.gauge(StructuredLogging.COMMITTED_OFFSET)
.update(taskStatus.getCommitOffset());
try {
// try to get the existing job status
Versioned<StoredJobStatus> oldStoredJobStatus =
jobStatusStore.get(jobStatus.getJob().getJobId());
StoredJobStatus oldJobStatusModel = oldStoredJobStatus.model();
// Check if the job status has been changed
boolean jobStatusNotChanged =
Objects.nonNull(oldJobStatusModel)
&& jobStatus.equals(oldJobStatusModel.getJobStatus())
&& workerId == oldJobStatusModel.getWorkerId();
boolean jobStatusChanged = !jobStatusNotChanged;
try {
// try to put job status to zk only if the job status is changed
if (jobStatusChanged) {
jobStatusStore.put(
jobStatus.getJob().getJobId(),
Versioned.from(storedJobStatus, oldStoredJobStatus.version()));
}
} catch (Exception e) {
logger.warn(
"failed to put stored jobStatus to zk",
StructuredLogging.jobId(jobStatus.getJob().getJobId()),
StructuredLogging.kafkaGroup(task.getConsumerGroup()),
StructuredLogging.kafkaTopic(task.getTopic()),
StructuredLogging.kafkaPartition(task.getPartition()),
e);
}
} catch (Exception e) {
logger.debug(
"no stored jobStatus before",
StructuredLogging.jobId(jobStatus.getJob().getJobId()),
e);
try {
// if no stored job status, try to create one
jobStatusStore.create(storedJobStatus, (key, value) -> value);
} catch (Exception ex) {
logger.error(
"failed to create stored jobStatus",
StructuredLogging.jobId(jobStatus.getJob().getJobId()),
StructuredLogging.kafkaGroup(task.getConsumerGroup()),
StructuredLogging.kafkaTopic(task.getTopic()),
StructuredLogging.kafkaPartition(task.getPartition()),
ex);
}
}
});
},
"masterworkerservice.updatejobstatuses");
}