private static String send()

in src/main/java/org/apache/doris/kafka/connector/service/RestService.java [135:219]


    private static String send(DorisOptions options, HttpRequestBase request, Logger logger)
            throws ConnectedFailedException {
        int connectTimeout = options.getRequestConnectTimeoutMs();
        int socketTimeout = options.getRequestReadTimeoutMs();
        logger.trace(
                "connect timeout set to '{}'. socket timeout set to '{}'.",
                connectTimeout,
                socketTimeout);

        RequestConfig requestConfig =
                RequestConfig.custom()
                        .setConnectTimeout(connectTimeout)
                        .setSocketTimeout(socketTimeout)
                        .build();

        request.setConfig(requestConfig);
        logger.info(
                "Send request to Doris FE '{}' with user '{}'.",
                request.getURI(),
                options.getUser());
        int statusCode = -1;
        AtomicReference<String> result = new AtomicReference<>();
        try {
            BackoffAndRetryUtils.backoffAndRetry(
                    LoadOperation.SEND_REQUEST_TO_DORIS,
                    () -> {
                        logger.debug("doris request {}.", request.getURI());
                        try {
                            String response = null;
                            if (request instanceof HttpGet) {
                                response =
                                        getConnectionGet(
                                                request,
                                                options.getUser(),
                                                options.getPassword(),
                                                logger);
                            } else {
                                response =
                                        getConnectionPost(
                                                request,
                                                options.getUser(),
                                                options.getPassword(),
                                                logger);
                            }
                            if (Objects.isNull(response)) {
                                logger.warn(
                                        "Failed to get response from Doris FE {}, http code is {}",
                                        request.getURI(),
                                        statusCode);
                                throw new ConnectedFailedException(
                                        "Failed to get response from Doris FE {"
                                                + request.getURI()
                                                + "}, http code is {"
                                                + statusCode
                                                + "}");
                            }
                            logger.trace(
                                    "Success get response from Doris FE: {}, response is: {}.",
                                    request.getURI(),
                                    response);
                            // Handle the problem of inconsistent data format returned by http v1
                            // and v2
                            ObjectMapper mapper = new ObjectMapper();
                            Map map = mapper.readValue(response, Map.class);
                            if (map.containsKey("code") && map.containsKey("msg")) {
                                Object data = map.get("data");
                                result.set(mapper.writeValueAsString(data));
                            } else {
                                result.set(response);
                            }
                            return true;
                        } catch (IOException e) {
                            logger.warn(
                                    "Failed to connect doris, requestUri={}", request.getURI(), e);
                            throw new ConnectedFailedException(
                                    "Failed to connect doris, requestUri=" + request.getURI(), e);
                        }
                    });
        } catch (Exception e) {
            logger.error("Connect to doris {} failed.", request.getURI(), e);
            throw new ConnectedFailedException(
                    "Failed to connect doris request uri=" + request.getURI(), statusCode, e);
        }
        return result.get();
    }