in flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java [111:186]
private static String send(
DorisOptions options,
DorisReadOptions readOptions,
HttpRequestBase request,
Logger logger)
throws ConnectedFailedException {
int connectTimeout =
readOptions.getRequestConnectTimeoutMs() == null
? ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT
: readOptions.getRequestConnectTimeoutMs();
int socketTimeout =
readOptions.getRequestReadTimeoutMs() == null
? ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT
: readOptions.getRequestReadTimeoutMs();
int retries =
readOptions.getRequestRetries() == null
? ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT
: readOptions.getRequestRetries();
logger.trace(
"connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'.",
connectTimeout,
socketTimeout,
retries);
RequestConfig requestConfig =
RequestConfig.custom()
.setConnectTimeout(connectTimeout)
.setSocketTimeout(socketTimeout)
.build();
request.setConfig(requestConfig);
logger.info(
"Send request to Doris FE '{}' with user '{}'.",
request.getURI(),
options.getUsername());
IOException ex = null;
int statusCode = -1;
for (int attempt = 0; attempt < retries; attempt++) {
logger.debug("Attempt {} to request {}.", attempt, request.getURI());
try {
String response;
if (request instanceof HttpGet) {
response = getConnectionGet(request, options, logger);
} else {
response = getConnectionPost(request, options, logger);
}
if (response == null) {
logger.warn(
"Failed to get response from Doris FE {}, http code is {}",
request.getURI(),
statusCode);
continue;
}
logger.trace(
"Success get response from Doris FE: {}, response is: {}.",
request.getURI(),
response);
// Handle the problem of inconsistent data format returned by http v1 and v2
ObjectMapper mapper = new ObjectMapper();
Map map = mapper.readValue(response, Map.class);
if (map.containsKey("code") && map.containsKey("msg")) {
Object data = map.get("data");
return mapper.writeValueAsString(data);
} else {
return response;
}
} catch (IOException e) {
ex = e;
logger.warn(CONNECT_FAILED_MESSAGE, request.getURI(), e);
}
}
logger.error(CONNECT_FAILED_MESSAGE, request.getURI(), ex);
throw new ConnectedFailedException(request.getURI().toString(), statusCode, ex);
}