public void heartbeat()

in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/controller/rpc/ControllerWorkerService.java [140:203]


  public void heartbeat(
      HeartbeatRequest request, StreamObserver<HeartbeatResponse> responseObserver) {
    Instrumentation.instrument.withStreamObserver(
        logger,
        infra.scope(),
        infra.tracer(),
        withLeaderRedirect(
            (req, resW) -> {
              instrumentParticipants(req.getParticipants().getWorker());
              Node node = request.getParticipants().getWorker();
              long workerId = node.getId();
              // If a worker does not exist, workerStore.get will throw an exception, and this
              // master will
              // send the exception to the worker.
              Versioned<StoredWorker> worker = workerStore.get(workerId);
              Map<Long, Versioned<StoredJob>> assigned =
                  jobStore.getAll(job -> job.getWorkerId() == worker.model().getNode().getId());
              // workers only report running jobs
              List<JobStatus> running = request.getJobStatusList();
              updateJobStatuses(running, workerId);
              List<Command> commandList =
                  buildCommandList(buildAssignedJobsMap(assigned), buildRunningJobsMap(running));
              logCommandsToWorker(workerId, commandList);

              // putThrough if state is changing
              if (worker.model().getState() != WorkerState.WORKER_STATE_WORKING) {
                workerStore.putThrough(
                    worker.model().getNode().getId(),
                    VersionedProto.from(
                        StoredWorker.newBuilder(worker.model())
                            .setState(WorkerState.WORKER_STATE_WORKING)
                            .build(),
                        worker.version()));
              } else {
                // otherwise, use regular put, which defers to the configured bufferedWriteInterval.
                workerStore.put(
                    worker.model().getNode().getId(),
                    VersionedProto.from(
                        StoredWorker.newBuilder(worker.model())
                            .setState(WorkerState.WORKER_STATE_WORKING)
                            .build(),
                        worker.version()));
              }
              resW.onNext(
                  HeartbeatResponse.newBuilder()
                      .setParticipants(
                          Participants.newBuilder()
                              .setMaster(master)
                              .setWorker(worker.model().getNode())
                              .build())
                      .addAllCommands(commandList)
                      .build());
              resW.onCompleted();
            },
            // we deliberately don't set worker because the response should only contain
            // worker info got from the zookeeper.
            node ->
                HeartbeatResponse.newBuilder()
                    .setParticipants(Participants.newBuilder().setMaster(node).build())
                    .build()),
        request,
        responseObserver,
        "masterworkerservice.heartbeat");
  }