public State nextState()

in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/controller/StateRegistering.java [65:159]


  public State nextState() {
    boolean isFailure = false;
    long startNs = System.nanoTime();
    Stopwatch registerWorkerTimer =
        infra.scope().timer(MetricsNames.REGISTER_WORKER_LATENCY).start();
    try {
      RegisterWorkerRequest request =
          RegisterWorkerRequest.newBuilder()
              .setParticipants(
                  Participants.newBuilder()
                      .setMaster(controllerClient.getNode())
                      .setWorker(worker)
                      .build())
              .build();
      RegisterWorkerResponse response = controllerClient.getStub().registerWorker(request);
      if (response.getParticipants().getMaster().equals(controllerClient.getNode())) {
        assertValidWorker(response.getParticipants().getWorker().getId());
        lease.success();
        return StateWorking.from(this, response.getParticipants().getWorker(), controllerClient);
      } else {
        // skipping markSuccess
        isFailure = true;
        infra
            .scope()
            .tagged(
                ImmutableMap.of(
                    StructuredLogging.FROM_MASTER_HOST,
                    controllerClient.getNode().getHost(),
                    StructuredLogging.TO_MASTER_HOST,
                    response.getParticipants().getMaster().getHost()))
            .counter(MetricsNames.REGISTER_WORKER_REDIRECT)
            .inc(1);
        ControllerClient newControllerClient = null;
        try {
          newControllerClient =
              controllerClientFactory.reconnectOnChange(
                  this.controllerClient, response.getParticipants().getMaster());
        } catch (Exception reconnectException) {
          logger.error(
              "[{} -> {}] "
                  + "got redirect master response but failed to reconnect to the new master",
              StateRegistering.STATE,
              StateRegistering.STATE,
              StructuredLogging.masterHostPort(
                  NodeUtils.getHostAndPortString(response.getParticipants().getMaster())),
              reconnectException);
          throw reconnectException;
        }
        markSuccess(
            "successfully redirect master",
            StateRegistering.STATE,
            StateRegistering.STATE,
            Duration.between(startNs, System.nanoTime()),
            NodeUtils.getHostAndPortString(response.getParticipants().getMaster()),
            false);
        return StateRegistering.from(this, newControllerClient);
      }
    } catch (Exception e) {
      isFailure = true;
      infra.scope().counter(MetricsNames.REGISTER_WORKER_FAILURE).inc(1);
      markError(
          "failed to register worker with master",
          StateRegistering.STATE,
          StateConnecting.STATE,
          Duration.between(startNs, System.nanoTime()),
          NodeUtils.getHostAndPortString(controllerClient.getNode()),
          e);
      try {
        controllerClient.close();
      } catch (Exception closeException) {
        // We don't take any extra action here because we have initialized the shutdown process,
        // the exception indicates that the masterClient was not successfully closed before a
        // pre-defined timeout, but the masterClient should be eventually close.
        logger.warn(
            "failed to close the master client",
            StructuredLogging.masterHostPort(
                NodeUtils.getHostAndPortString(controllerClient.getNode())),
            closeException);
      }
      return StateConnecting.from(this);
    } finally {
      registerWorkerTimer.stop();
      if (!isFailure) {
        markSuccess(
            "successfully registered worker to master",
            StateRegistering.STATE,
            StateWorking.STATE,
            Duration.between(startNs, System.nanoTime()),
            String.format(
                "%s:%d",
                controllerClient.getNode().getHost(), controllerClient.getNode().getPort()),
            false);
      }
    }
  }