public String uploadToInternalStage()

in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java [282:334]


        public String uploadToInternalStage(String address, ByteBuffer data)
                throws CopyLoadException {
            ByteArrayEntity entity =
                    new ByteArrayEntity(data.array(), data.arrayOffset(), data.limit());
            HttpPutBuilder putBuilder = new HttpPutBuilder();
            putBuilder.setUrl(address).addCommonHeader().setEntity(entity);
            HttpPut httpPut = putBuilder.build();
            try {
                Object result =
                        BackoffAndRetryUtils.backoffAndRetry(
                                BackoffAndRetryUtils.LoadOperation.UPLOAD_FILE,
                                () -> {
                                    try (CloseableHttpClient httpClient =
                                            httpClientBuilder.build()) {
                                        try (CloseableHttpResponse response =
                                                httpClient.execute(httpPut)) {
                                            final int statusCode =
                                                    response.getStatusLine().getStatusCode();
                                            String requestId =
                                                    getRequestId(response.getAllHeaders());
                                            if (statusCode == 200 && response.getEntity() != null) {
                                                String loadResult =
                                                        EntityUtils.toString(response.getEntity());
                                                if (loadResult == null || loadResult.isEmpty()) {
                                                    // upload finished
                                                    return requestId;
                                                }
                                                LOG.error(
                                                        "upload file failed, requestId is {}, response result: {}",
                                                        requestId,
                                                        loadResult);
                                                throw new CopyLoadException(
                                                        "upload file failed: "
                                                                + response.getStatusLine()
                                                                        .toString()
                                                                + ", with requestId "
                                                                + requestId);
                                            }
                                            throw new CopyLoadException(
                                                    "upload file error: "
                                                            + response.getStatusLine().toString()
                                                            + ", with requestId "
                                                            + requestId);
                                        }
                                    }
                                });
                return String.valueOf(result);
            } catch (Exception ex) {
                LOG.error("Failed to upload data to internal stage ", ex);
                throw new CopyLoadException(
                        "Failed to upload data to internal stage, " + ex.getMessage());
            }
        }