in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/controller/rpc/ControllerWorkerService.java [140:203]
public void heartbeat(
HeartbeatRequest request, StreamObserver<HeartbeatResponse> responseObserver) {
Instrumentation.instrument.withStreamObserver(
logger,
infra.scope(),
infra.tracer(),
withLeaderRedirect(
(req, resW) -> {
instrumentParticipants(req.getParticipants().getWorker());
Node node = request.getParticipants().getWorker();
long workerId = node.getId();
// If a worker does not exist, workerStore.get will throw an exception, and this
// master will
// send the exception to the worker.
Versioned<StoredWorker> worker = workerStore.get(workerId);
Map<Long, Versioned<StoredJob>> assigned =
jobStore.getAll(job -> job.getWorkerId() == worker.model().getNode().getId());
// workers only report running jobs
List<JobStatus> running = request.getJobStatusList();
updateJobStatuses(running, workerId);
List<Command> commandList =
buildCommandList(buildAssignedJobsMap(assigned), buildRunningJobsMap(running));
logCommandsToWorker(workerId, commandList);
// putThrough if state is changing
if (worker.model().getState() != WorkerState.WORKER_STATE_WORKING) {
workerStore.putThrough(
worker.model().getNode().getId(),
VersionedProto.from(
StoredWorker.newBuilder(worker.model())
.setState(WorkerState.WORKER_STATE_WORKING)
.build(),
worker.version()));
} else {
// otherwise, use regular put, which defers to the configured bufferedWriteInterval.
workerStore.put(
worker.model().getNode().getId(),
VersionedProto.from(
StoredWorker.newBuilder(worker.model())
.setState(WorkerState.WORKER_STATE_WORKING)
.build(),
worker.version()));
}
resW.onNext(
HeartbeatResponse.newBuilder()
.setParticipants(
Participants.newBuilder()
.setMaster(master)
.setWorker(worker.model().getNode())
.build())
.addAllCommands(commandList)
.build());
resW.onCompleted();
},
// we deliberately don't set worker because the response should only contain
// worker info got from the zookeeper.
node ->
HeartbeatResponse.newBuilder()
.setParticipants(Participants.newBuilder().setMaster(node).build())
.build()),
request,
responseObserver,
"masterworkerservice.heartbeat");
}