private void connect()

in tablestore/src/main/java/com/alicloud/openservices/tablestore/tunnel/worker/TunnelWorker.java [107:151]


    private void connect() {
        if (!workerStatus.compareAndSet(TunnelWorkerStatus.WORKER_READY, TunnelWorkerStatus.WORKER_STARTED)) {
            throw new ClientException(String.format("Tunnel worker has already been %s status", workerStatus));
        }
        while (true) {
            try {
                TunnelClientConfig conf = new TunnelClientConfig(workerConfig.getHeartbeatTimeoutInSec(),
                        workerConfig.getClientTag());
                ConnectTunnelRequest request = new ConnectTunnelRequest(tunnelId, conf);
                ConnectTunnelResponse resp = client.connectTunnel(request);
                this.clientId = resp.getClientId();
                // After ConnectTunnel succeeds, initialize the other dependent components of TunnelWorker.
                this.channelDialer = new ChannelDialer(client, workerConfig);
                // According to the Callback for processing data and CheckpointInterval (the interval for recording data checkpoints to the server) provided by the user.
                // Wraps a data processor with automatic Checkpoint recording functionality.
                if (factory == null) {
                    factory = new ChannelProcessFactory(workerConfig);
                }
                stateMachine = new TunnelStateMachine(
                        tunnelId,
                        clientId,
                        channelDialer,
                        factory,
                        client);
                stateMachine.setEnableClosingChannelDetect(workerConfig.isEnableClosingChannelDetect());
                LOG.info("Connect tunnel success, RequestId: {}, clientId: {}, tunnelId: {}", resp.getRequestId(), clientId, tunnelId);
                break;
            } catch (TableStoreException te) {
                LOG.warn("Connect tunnel failed, tunnel id {}, error detail {}", tunnelId, te.toString());
                if (isTunnelInvalid(te.getErrorCode())) {
                    LOG.error("Tunnel is expired or invalid, tunnel worker will be halted.");
                    workerStatus.set(TunnelWorkerStatus.WORKER_HALT);
                    throw te;
                }
            } catch (Exception e) {
                LOG.warn("Connect tunnel failed, tunnel id {}, error detail {}", tunnelId, e.toString());
            }

            try {
                Thread.sleep(new Random(System.currentTimeMillis()).nextInt(WORKER_RANDOM_RETRY_MILLIS) + 1);
            } catch (Exception e) {
                LOG.warn("Reconnect worker error, error detail: {}", e.toString());
            }
        }
    }