in tablestore/src/main/java/com/alicloud/openservices/tablestore/tunnel/worker/TunnelWorker.java [107:151]
private void connect() {
if (!workerStatus.compareAndSet(TunnelWorkerStatus.WORKER_READY, TunnelWorkerStatus.WORKER_STARTED)) {
throw new ClientException(String.format("Tunnel worker has already been %s status", workerStatus));
}
while (true) {
try {
TunnelClientConfig conf = new TunnelClientConfig(workerConfig.getHeartbeatTimeoutInSec(),
workerConfig.getClientTag());
ConnectTunnelRequest request = new ConnectTunnelRequest(tunnelId, conf);
ConnectTunnelResponse resp = client.connectTunnel(request);
this.clientId = resp.getClientId();
// After ConnectTunnel succeeds, initialize the other dependent components of TunnelWorker.
this.channelDialer = new ChannelDialer(client, workerConfig);
// According to the Callback for processing data and CheckpointInterval (the interval for recording data checkpoints to the server) provided by the user.
// Wraps a data processor with automatic Checkpoint recording functionality.
if (factory == null) {
factory = new ChannelProcessFactory(workerConfig);
}
stateMachine = new TunnelStateMachine(
tunnelId,
clientId,
channelDialer,
factory,
client);
stateMachine.setEnableClosingChannelDetect(workerConfig.isEnableClosingChannelDetect());
LOG.info("Connect tunnel success, RequestId: {}, clientId: {}, tunnelId: {}", resp.getRequestId(), clientId, tunnelId);
break;
} catch (TableStoreException te) {
LOG.warn("Connect tunnel failed, tunnel id {}, error detail {}", tunnelId, te.toString());
if (isTunnelInvalid(te.getErrorCode())) {
LOG.error("Tunnel is expired or invalid, tunnel worker will be halted.");
workerStatus.set(TunnelWorkerStatus.WORKER_HALT);
throw te;
}
} catch (Exception e) {
LOG.warn("Connect tunnel failed, tunnel id {}, error detail {}", tunnelId, e.toString());
}
try {
Thread.sleep(new Random(System.currentTimeMillis()).nextInt(WORKER_RANDOM_RETRY_MILLIS) + 1);
} catch (Exception e) {
LOG.warn("Reconnect worker error, error detail: {}", e.toString());
}
}
}