in spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisBackendThriftClient.java [70:107]
private void open() throws ConnectedFailedException {
logger.debug("Open client to Doris BE '{}'.", backend);
Exception ex = null;
for (int attempt = 0; !isConnected && attempt < retries; ++attempt) {
logger.debug("Attempt {} to connect {}.", attempt, backend);
try {
TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
TConfiguration tConf = new TConfiguration();
Integer maxMessageSize = config.getValue(DorisOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE);
tConf.setMaxMessageSize(maxMessageSize);
Integer connectTimeout = config.getValue(DorisOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS);
Integer socketTimeout = config.getValue(DorisOptions.DORIS_REQUEST_READ_TIMEOUT_MS);
logger.trace("connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'.",
connectTimeout, socketTimeout, this.retries);
transport = new TSocket(tConf, backend.getHost(), backend.getRpcPort(), socketTimeout, connectTimeout);
TProtocol protocol = factory.getProtocol(transport);
client = new TDorisExternalService.Client(protocol);
logger.trace("Connect status before open transport to {} is '{}'.", backend, isConnected);
if (!transport.isOpen()) {
transport.open();
isConnected = true;
}
} catch (TTransportException e) {
logger.warn(ErrorMessages.CONNECT_FAILED_MESSAGE, backend, e);
ex = e;
} catch (OptionRequiredException e) {
ex = e;
}
if (isConnected) {
logger.info("Success connect to {}.", backend);
break;
}
}
if (!isConnected) {
logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, backend);
throw new ConnectedFailedException(backend.toString(), ex);
}
}