in src/main/java/org/apache/doris/kafka/connector/service/RestService.java [135:219]
private static String send(DorisOptions options, HttpRequestBase request, Logger logger)
throws ConnectedFailedException {
int connectTimeout = options.getRequestConnectTimeoutMs();
int socketTimeout = options.getRequestReadTimeoutMs();
logger.trace(
"connect timeout set to '{}'. socket timeout set to '{}'.",
connectTimeout,
socketTimeout);
RequestConfig requestConfig =
RequestConfig.custom()
.setConnectTimeout(connectTimeout)
.setSocketTimeout(socketTimeout)
.build();
request.setConfig(requestConfig);
logger.info(
"Send request to Doris FE '{}' with user '{}'.",
request.getURI(),
options.getUser());
int statusCode = -1;
AtomicReference<String> result = new AtomicReference<>();
try {
BackoffAndRetryUtils.backoffAndRetry(
LoadOperation.SEND_REQUEST_TO_DORIS,
() -> {
logger.debug("doris request {}.", request.getURI());
try {
String response = null;
if (request instanceof HttpGet) {
response =
getConnectionGet(
request,
options.getUser(),
options.getPassword(),
logger);
} else {
response =
getConnectionPost(
request,
options.getUser(),
options.getPassword(),
logger);
}
if (Objects.isNull(response)) {
logger.warn(
"Failed to get response from Doris FE {}, http code is {}",
request.getURI(),
statusCode);
throw new ConnectedFailedException(
"Failed to get response from Doris FE {"
+ request.getURI()
+ "}, http code is {"
+ statusCode
+ "}");
}
logger.trace(
"Success get response from Doris FE: {}, response is: {}.",
request.getURI(),
response);
// Handle the problem of inconsistent data format returned by http v1
// and v2
ObjectMapper mapper = new ObjectMapper();
Map map = mapper.readValue(response, Map.class);
if (map.containsKey("code") && map.containsKey("msg")) {
Object data = map.get("data");
result.set(mapper.writeValueAsString(data));
} else {
result.set(response);
}
return true;
} catch (IOException e) {
logger.warn(
"Failed to connect doris, requestUri={}", request.getURI(), e);
throw new ConnectedFailedException(
"Failed to connect doris, requestUri=" + request.getURI(), e);
}
});
} catch (Exception e) {
logger.error("Connect to doris {} failed.", request.getURI(), e);
throw new ConnectedFailedException(
"Failed to connect doris request uri=" + request.getURI(), statusCode, e);
}
return result.get();
}