in flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java [117:144]
public TScanOpenResult openScanner(TScanOpenParams openParams) {
logger.debug("OpenScanner to '{}', parameter is '{}'.", routing, openParams);
if (!isConnected) {
open();
}
TException ex = null;
for (int attempt = 0; attempt < retries; ++attempt) {
logger.debug("Attempt {} to openScanner {}.", attempt, routing);
try {
TScanOpenResult result = client.openScanner(openParams);
if (result == null) {
logger.warn("Open scanner result from {} is null.", routing);
continue;
}
if (!TStatusCode.OK.equals(result.getStatus().getStatusCode())) {
logger.warn("The status of open scanner result from {} is '{}', error message is: {}.",
routing, result.getStatus().getStatusCode(), result.getStatus().getErrorMsgs());
continue;
}
return result;
} catch (TException e) {
logger.warn("Open scanner from {} failed.", routing, e);
ex = e;
}
}
logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, routing);
throw new ConnectedFailedException(routing.toString(), ex);
}