in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/controller/StateRegistering.java [65:159]
public State nextState() {
boolean isFailure = false;
long startNs = System.nanoTime();
Stopwatch registerWorkerTimer =
infra.scope().timer(MetricsNames.REGISTER_WORKER_LATENCY).start();
try {
RegisterWorkerRequest request =
RegisterWorkerRequest.newBuilder()
.setParticipants(
Participants.newBuilder()
.setMaster(controllerClient.getNode())
.setWorker(worker)
.build())
.build();
RegisterWorkerResponse response = controllerClient.getStub().registerWorker(request);
if (response.getParticipants().getMaster().equals(controllerClient.getNode())) {
assertValidWorker(response.getParticipants().getWorker().getId());
lease.success();
return StateWorking.from(this, response.getParticipants().getWorker(), controllerClient);
} else {
// skipping markSuccess
isFailure = true;
infra
.scope()
.tagged(
ImmutableMap.of(
StructuredLogging.FROM_MASTER_HOST,
controllerClient.getNode().getHost(),
StructuredLogging.TO_MASTER_HOST,
response.getParticipants().getMaster().getHost()))
.counter(MetricsNames.REGISTER_WORKER_REDIRECT)
.inc(1);
ControllerClient newControllerClient = null;
try {
newControllerClient =
controllerClientFactory.reconnectOnChange(
this.controllerClient, response.getParticipants().getMaster());
} catch (Exception reconnectException) {
logger.error(
"[{} -> {}] "
+ "got redirect master response but failed to reconnect to the new master",
StateRegistering.STATE,
StateRegistering.STATE,
StructuredLogging.masterHostPort(
NodeUtils.getHostAndPortString(response.getParticipants().getMaster())),
reconnectException);
throw reconnectException;
}
markSuccess(
"successfully redirect master",
StateRegistering.STATE,
StateRegistering.STATE,
Duration.between(startNs, System.nanoTime()),
NodeUtils.getHostAndPortString(response.getParticipants().getMaster()),
false);
return StateRegistering.from(this, newControllerClient);
}
} catch (Exception e) {
isFailure = true;
infra.scope().counter(MetricsNames.REGISTER_WORKER_FAILURE).inc(1);
markError(
"failed to register worker with master",
StateRegistering.STATE,
StateConnecting.STATE,
Duration.between(startNs, System.nanoTime()),
NodeUtils.getHostAndPortString(controllerClient.getNode()),
e);
try {
controllerClient.close();
} catch (Exception closeException) {
// We don't take any extra action here because we have initialized the shutdown process,
// the exception indicates that the masterClient was not successfully closed before a
// pre-defined timeout, but the masterClient should be eventually close.
logger.warn(
"failed to close the master client",
StructuredLogging.masterHostPort(
NodeUtils.getHostAndPortString(controllerClient.getNode())),
closeException);
}
return StateConnecting.from(this);
} finally {
registerWorkerTimer.stop();
if (!isFailure) {
markSuccess(
"successfully registered worker to master",
StateRegistering.STATE,
StateWorking.STATE,
Duration.between(startNs, System.nanoTime()),
String.format(
"%s:%d",
controllerClient.getNode().getHost(), controllerClient.getNode().getPort()),
false);
}
}
}