in flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java [136:183]
public TScanOpenResult openScanner(TScanOpenParams openParams) {
logger.debug("OpenScanner to '{}', parameter is '{}'.", routing, openParams);
if (!isConnected) {
open();
}
TException ex = null;
TScanOpenResult result = null;
for (int attempt = 0; attempt < retries; ++attempt) {
logger.debug("Attempt {} to openScanner {}.", attempt, routing);
try {
result = client.openScanner(openParams);
if (result == null) {
logger.warn("Open scanner result from {} is null.", routing);
continue;
}
if (!TStatusCode.OK.equals(result.getStatus().getStatusCode())) {
logger.warn(
"The status of open scanner result from {} is '{}', error message is: {}.",
routing,
result.getStatus().getStatusCode(),
result.getStatus().getErrorMsgs());
continue;
}
logger.info(
"OpenScanner success for Doris BE '{}' with contextId '{}' for tablets '{}'.",
routing,
result.getContextId(),
openParams.tablet_ids);
return result;
} catch (TException e) {
logger.warn("Open scanner from {} failed.", routing, e);
ex = e;
}
}
if (result != null && (TStatusCode.OK != (result.getStatus().getStatusCode()))) {
logger.error(
ErrorMessages.DORIS_INTERNAL_FAIL_MESSAGE,
routing,
result.getStatus().getStatusCode(),
result.getStatus().getErrorMsgs());
throw new DorisInternalException(
routing.toString(),
result.getStatus().getStatusCode(),
result.getStatus().getErrorMsgs());
}
logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, routing);
throw new ConnectedFailedException(routing.toString(), ex);
}