private static String send()

in flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java [111:186]


    private static String send(
            DorisOptions options,
            DorisReadOptions readOptions,
            HttpRequestBase request,
            Logger logger)
            throws ConnectedFailedException {
        int connectTimeout =
                readOptions.getRequestConnectTimeoutMs() == null
                        ? ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT
                        : readOptions.getRequestConnectTimeoutMs();
        int socketTimeout =
                readOptions.getRequestReadTimeoutMs() == null
                        ? ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT
                        : readOptions.getRequestReadTimeoutMs();
        int retries =
                readOptions.getRequestRetries() == null
                        ? ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT
                        : readOptions.getRequestRetries();
        logger.trace(
                "connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'.",
                connectTimeout,
                socketTimeout,
                retries);

        RequestConfig requestConfig =
                RequestConfig.custom()
                        .setConnectTimeout(connectTimeout)
                        .setSocketTimeout(socketTimeout)
                        .build();

        request.setConfig(requestConfig);
        logger.info(
                "Send request to Doris FE '{}' with user '{}'.",
                request.getURI(),
                options.getUsername());
        IOException ex = null;
        int statusCode = -1;

        for (int attempt = 0; attempt < retries; attempt++) {
            logger.debug("Attempt {} to request {}.", attempt, request.getURI());
            try {
                String response;
                if (request instanceof HttpGet) {
                    response = getConnectionGet(request, options, logger);
                } else {
                    response = getConnectionPost(request, options, logger);
                }
                if (response == null) {
                    logger.warn(
                            "Failed to get response from Doris FE {}, http code is {}",
                            request.getURI(),
                            statusCode);
                    continue;
                }
                logger.trace(
                        "Success get response from Doris FE: {}, response is: {}.",
                        request.getURI(),
                        response);
                // Handle the problem of inconsistent data format returned by http v1 and v2
                ObjectMapper mapper = new ObjectMapper();
                Map map = mapper.readValue(response, Map.class);
                if (map.containsKey("code") && map.containsKey("msg")) {
                    Object data = map.get("data");
                    return mapper.writeValueAsString(data);
                } else {
                    return response;
                }
            } catch (IOException e) {
                ex = e;
                logger.warn(CONNECT_FAILED_MESSAGE, request.getURI(), e);
            }
        }

        logger.error(CONNECT_FAILED_MESSAGE, request.getURI(), ex);
        throw new ConnectedFailedException(request.getURI().toString(), statusCode, ex);
    }