public State nextState()

in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/controller/StateWorking.java [88:434]


  public State nextState() {
    final String cancelAllCommand = "cancelAll";
    final String getAllCommand = "getAll";
    final String aggregatedStage = "aggregated";
    boolean isFailure = false;
    long startNs = System.nanoTime();
    try {
      // Get Job Status from user provided JobStatusChecker.
      Collection<JobStatus> jobList = null;
      Stopwatch statusCheckTimer = infra.scope().timer(Controllable.Metrics.LATENCY).start();
      boolean jobStatusCheckerFailure = false;
      Map<String, String> getAllTags =
          ImmutableMap.of(StructuredLogging.CONTROLLABLE_COMMAND, getAllCommand);
      try {
        infra.scope().tagged(getAllTags).counter(Controllable.Metrics.CALLED).inc(1);
        logger.debug(
            Controllable.Metrics.CALLED,
            StructuredLogging.workerStage(aggregatedStage),
            StructuredLogging.controllableCommand(getAllCommand));
        jobList = controllable.getJobStatus();
      } catch (Exception e) {
        jobStatusCheckerFailure = true;
        infra.scope().tagged(getAllTags).counter(Controllable.Metrics.FAILURE).inc(1);
        logger.warn(
            Controllable.Metrics.FAILURE,
            StructuredLogging.workerStage(aggregatedStage),
            StructuredLogging.controllableCommand(getAllCommand),
            e);
      } finally {
        statusCheckTimer.stop();
        if (!jobStatusCheckerFailure) {
          infra.scope().tagged(getAllTags).counter(Controllable.Metrics.SUCCESS).inc(1);
          logger.debug(
              Controllable.Metrics.SUCCESS,
              StructuredLogging.workerStage(aggregatedStage),
              StructuredLogging.controllableCommand(getAllCommand));
        }
      }
      // We fallback to sending empty heartbeat if job status check fails because
      // we don't want to trigger unnecessarily rebalance.
      // This ensures that job assignment is stable but there may be no progress on the job.
      // The master is responsible for ensuring job progress by checking job status progress.
      if (jobList == null) {
        jobList = ImmutableList.of();
      }

      if (controllerClient.getChannel().isTerminated()) {
        throw new IllegalStateException("master client grpc change is terminated");
      }
      // TODO(T4576995) re-resolve uns on heartbeatFailure
      // Send Heartbeat Request
      boolean heartbeatFailure = false;
      Stopwatch heartbeatTimer = infra.scope().timer(MetricsNames.HEARTBEAT_LATENCY).start();
      HeartbeatResponse heartbeatResponse = null;
      try {
        HeartbeatRequest heartbeatRequest =
            HeartbeatRequest.newBuilder()
                .setParticipants(
                    Participants.newBuilder()
                        .setMaster(controllerClient.getNode())
                        .setWorker(worker)
                        .build())
                .addAllJobStatus(jobList)
                .build();
        heartbeatResponse =
            controllerClient
                .getStub()
                .withDeadlineAfter(heartbeatTimeout.toMillis(), TimeUnit.MILLISECONDS)
                .heartbeat(heartbeatRequest);
      } catch (Exception e) {
        heartbeatFailure = true;
        infra.scope().counter(MetricsNames.HEARTBEAT_FAILURE).inc(1);
        // Rethrow will log in the method's outer exception catch and ensure correct state
        // transition so we don't log here.
        throw e;
      } finally {
        heartbeatTimer.stop();
        if (!heartbeatFailure) {
          infra.scope().counter(MetricsNames.HEARTBEAT_SUCCESS).inc(1);
        }
      }
      if (heartbeatResponse.getParticipants().getMaster().equals(controllerClient.getNode())) {
        assertValidWorker(heartbeatResponse.getParticipants().getWorker().getId());
        lease.success();
        // Submit commands to user provided JobManager
        for (Command command : heartbeatResponse.getCommandsList()) {
          Function<Job, CompletionStage<Void>> controllableToRun = null;
          final Scope commandScope =
              infra
                  .scope()
                  .tagged(
                      ImmutableMap.of(
                          StructuredLogging.COMMAND_TYPE, command.getType().toString()));
          switch (command.getType()) {
            case COMMAND_TYPE_RUN_JOB:
              controllableToRun = controllable::run;
              break;
            case COMMAND_TYPE_CANCEL_JOB:
              controllableToRun = controllable::cancel;
              break;
            case COMMAND_TYPE_UPDATE_JOB:
              controllableToRun = controllable::update;
              break;
            default:
              logger.warn(
                  Controllable.Metrics.SKIP, StructuredLogging.commandType(command.getType()));
              commandScope.counter(Controllable.Metrics.SKIP).inc(1);
              continue;
          }

          // Execute controllable on a separate thread so we don't block the heartbeat.
          final Function<Job, CompletionStage<Void>> immutableControllableToRun = controllableToRun;
          final long workerId = heartbeatResponse.getParticipants().getWorker().getId();
          final Stopwatch commandTimer = commandScope.timer(Controllable.Metrics.LATENCY).start();
          commandScope.counter(Controllable.Metrics.CALLED).inc(1);
          logger.info(
              Controllable.Metrics.CALLED,
              StructuredLogging.workerId(workerId),
              StructuredLogging.jobId(command.getJob().getJobId()),
              StructuredLogging.kafkaTopic(command.getJob().getKafkaConsumerTask().getTopic()),
              StructuredLogging.kafkaCluster(command.getJob().getKafkaConsumerTask().getCluster()),
              StructuredLogging.kafkaGroup(
                  command.getJob().getKafkaConsumerTask().getConsumerGroup()),
              StructuredLogging.kafkaPartition(
                  command.getJob().getKafkaConsumerTask().getPartition()),
              StructuredLogging.commandType(command.getType().toString()));
          // Command timeout indicates PipelineManager failed to process command in limited time
          // This metric will be used as indicator of zombie worker
          ScheduledFuture commandTimeout =
              COMMAND_TIMEOUT_SCHEDULER.schedule(
                  () -> {
                    logger.error(
                        Controllable.Metrics.TIMEOUT,
                        StructuredLogging.workerId(workerId),
                        StructuredLogging.jobId(command.getJob().getJobId()),
                        StructuredLogging.kafkaTopic(
                            command.getJob().getKafkaConsumerTask().getTopic()),
                        StructuredLogging.kafkaCluster(
                            command.getJob().getKafkaConsumerTask().getCluster()),
                        StructuredLogging.kafkaGroup(
                            command.getJob().getKafkaConsumerTask().getConsumerGroup()),
                        StructuredLogging.kafkaPartition(
                            command.getJob().getKafkaConsumerTask().getPartition()),
                        StructuredLogging.commandType(command.getType().toString()));
                    infra.scope().counter(Controllable.Metrics.TIMEOUT).inc(1);
                  },
                  COMMAND_TIMEOUT_MS,
                  TimeUnit.MILLISECONDS);
          CompletableFuture.runAsync(
              () -> {
                immutableControllableToRun
                    .apply(command.getJob())
                    .whenComplete(
                        (v, ex) -> {
                          // cancel commandTimeout if complete timely
                          commandTimeout.cancel(false);
                          commandTimer.stop();
                          // TODO: this section is not covered due to flaky tests. Fix the flaky
                          // test.
                          if (ex != null) {
                            logger.warn(
                                Controllable.Metrics.FAILURE,
                                StructuredLogging.workerId(workerId),
                                StructuredLogging.jobId(command.getJob().getJobId()),
                                StructuredLogging.kafkaTopic(
                                    command.getJob().getKafkaConsumerTask().getTopic()),
                                StructuredLogging.kafkaCluster(
                                    command.getJob().getKafkaConsumerTask().getCluster()),
                                StructuredLogging.kafkaGroup(
                                    command.getJob().getKafkaConsumerTask().getConsumerGroup()),
                                StructuredLogging.kafkaPartition(
                                    command.getJob().getKafkaConsumerTask().getPartition()),
                                StructuredLogging.commandType(command.getType().toString()),
                                ex);
                            commandScope.counter(Controllable.Metrics.FAILURE).inc(1);
                          } else {
                            logger.info(
                                Controllable.Metrics.SUCCESS,
                                StructuredLogging.workerId(workerId),
                                StructuredLogging.jobId(command.getJob().getJobId()),
                                StructuredLogging.kafkaTopic(
                                    command.getJob().getKafkaConsumerTask().getTopic()),
                                StructuredLogging.kafkaCluster(
                                    command.getJob().getKafkaConsumerTask().getCluster()),
                                StructuredLogging.kafkaGroup(
                                    command.getJob().getKafkaConsumerTask().getConsumerGroup()),
                                StructuredLogging.kafkaPartition(
                                    command.getJob().getKafkaConsumerTask().getPartition()),
                                StructuredLogging.commandType(command.getType().toString()));
                            commandScope.counter(Controllable.Metrics.SUCCESS).inc(1);
                          }
                        });
              },
              commandExecutorService);
        }
        return StateWorking.from(
            this, heartbeatResponse.getParticipants().getWorker(), controllerClient);
      } else {
        // The else block handles the case that master changes.
        // skipping markSuccess
        isFailure = true;
        infra
            .scope()
            .tagged(
                ImmutableMap.of(
                    StructuredLogging.FROM_MASTER_HOST,
                    controllerClient.getNode().getHost(),
                    StructuredLogging.TO_MASTER_HOST,
                    heartbeatResponse.getParticipants().getMaster().getHost()))
            .counter(MetricsNames.HEARTBEAT_REDIRECT)
            .inc(1);
        ControllerClient newControllerClient = null;
        try {
          newControllerClient =
              controllerClientFactory.reconnectOnChange(
                  this.controllerClient, heartbeatResponse.getParticipants().getMaster());
        } catch (Exception reconnectException) {
          logger.error(
              "[{} -> {}] "
                  + "got redirect master response but failed to reconnect to the new master",
              StateWorking.STATE,
              StateWorking.STATE,
              StructuredLogging.masterHostPort(
                  NodeUtils.getHostAndPortString(heartbeatResponse.getParticipants().getMaster())),
              reconnectException);
          // rethrow the exception
          throw reconnectException;
        }
        markSuccess(
            "successfully redirect master",
            StateWorking.STATE,
            StateWorking.STATE,
            Duration.between(startNs, System.nanoTime()),
            NodeUtils.getHostAndPortString(heartbeatResponse.getParticipants().getMaster()),
            true);
        // the master will only provide the new master but not the worker, so we need to reuse
        // the worker
        return StateWorking.from(this, this.worker, newControllerClient);
      }
    } catch (Exception e) {
      isFailure = true;
      if (!lease.isValid()) {
        markError(
            "failed to heartbeat with master and lease expired",
            StateWorking.STATE,
            StateConnecting.STATE,
            Duration.between(startNs, System.nanoTime()),
            NodeUtils.getHostAndPortString(controllerClient.getNode()),
            e);
        cancelAll(aggregatedStage, cancelAllCommand);
        try {
          controllerClient.close();
        } catch (Exception closeException) {
          // We don't take any extra action here because we have initialized the shutdown process,
          // the exception indicates that the masterClient was not successfully closed before a
          // pre-defined timeout, but the masterClient should be eventually close.
          logger.warn(
              "failed to close the master client",
              StructuredLogging.masterHostPort(
                  NodeUtils.getHostAndPortString(controllerClient.getNode())),
              closeException);
        }
        return StateConnecting.from(this);
      } else {
        // reconnect when the current leader master is unavailable, this could be caused by
        // 1. the current leader master is dead.
        // 2. this is network issue.
        //
        // in this case, master leader redirect protocol does not work.
        if (e instanceof StatusRuntimeException
            && ((StatusRuntimeException) e).getStatus().getCode() == Status.UNAVAILABLE.getCode()) {
          ControllerClient newControllerClient = null;
          try {
            newControllerClient = controllerClientFactory.reconnect(controllerClient);
          } catch (Exception reconnectException) {
            markError(
                "lease has not expired. failed to heartbeat with master and reconnect failed",
                StateWorking.STATE,
                StateWorking.STATE,
                Duration.between(startNs, System.nanoTime()),
                NodeUtils.getHostAndPortString(controllerClient.getNode()),
                reconnectException);
            return StateWorking.from(this, this.worker, controllerClient);
          }
          if (!newControllerClient.getNode().equals(controllerClient.getNode())) {
            infra
                .scope()
                .tagged(
                    ImmutableMap.of(
                        StructuredLogging.FROM_MASTER_HOST,
                        controllerClient.getNode().getHost(),
                        StructuredLogging.TO_MASTER_HOST,
                        newControllerClient.getNode().getHost()))
                .counter(MetricsNames.HEARTBEAT_REDIRECT)
                .inc(1);
            markWarn(
                "lease has not expired. failed to heartbeat with master but successfully reconnect to a new master",
                StateWorking.STATE,
                StateWorking.STATE,
                Duration.between(startNs, System.nanoTime()),
                NodeUtils.getHostAndPortString(newControllerClient.getNode()),
                e);
          } else {
            markWarn(
                "lease has not expired. failed to heartbeat with master but successfully reconnect to the same master",
                StateWorking.STATE,
                StateWorking.STATE,
                Duration.between(startNs, System.nanoTime()),
                NodeUtils.getHostAndPortString(newControllerClient.getNode()),
                e);
          }
          return StateWorking.from(this, this.worker, newControllerClient);
        }
        // Do not aggressively reconnect. The reason is that the heartbeat might randomly fail, in
        // which case the leader does not change. Even if the master leader changes, the master
        // leader redirect protocol can redirect the worker to the current master leader.
        //
        // If we try to reconnect aggressively, it takes  several rounds for the worker to
        // (1) connect to a random master first
        // (2) then the worker is redirected to the leader
        // (3) then the worker sets up connection with the same master.
        // This process normally takes a time, causing the worker to fail heartbeat with the master,
        // leading to the result that the worker cancels all its running job, and the master assigns
        // those jobs to a new worker. The job reassignment will introduce unnecessary E2E latency
        // for the message receiver services.
        markWarn(
            "lease has not expired. failed to heartbeat with master",
            StateWorking.STATE,
            StateWorking.STATE,
            Duration.between(startNs, System.nanoTime()),
            NodeUtils.getHostAndPortString(controllerClient.getNode()),
            e);

        return StateWorking.from(this, this.worker, controllerClient);
      }
    } finally {
      if (!isFailure) {
        markSuccess(
            "successfully heartbeat with master",
            StateWorking.STATE,
            StateWorking.STATE,
            Duration.between(startNs, System.nanoTime()),
            NodeUtils.getHostAndPortString(controllerClient.getNode()),
            true);
      }
    }
  }