in flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java [70:96]
private void open() {
logger.debug("Open client to Doris BE '{}'.", routing);
TException ex = null;
for (int attempt = 0; attempt < retries; ++attempt) {
logger.debug("Attempt {} to connect {}.", attempt, routing);
try {
TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
transport = new TSocket(new TConfiguration(), routing.getHost(), routing.getPort(), socketTimeout, connectTimeout);
TProtocol protocol = factory.getProtocol(transport);
client = new TDorisExternalService.Client(protocol);
logger.trace("Connect status before open transport to {} is '{}'.", routing, isConnected);
if (!transport.isOpen()) {
transport.open();
isConnected = true;
logger.info("Success connect to {}.", routing);
break;
}
} catch (TTransportException e) {
logger.warn(ErrorMessages.CONNECT_FAILED_MESSAGE, routing, e);
ex = e;
}
}
if (!isConnected) {
logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, routing);
throw new ConnectedFailedException(routing.toString(), ex);
}
}