public void load()

in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java [457:564]


        public void load(String label, BatchRecordBuffer buffer) throws IOException {
            if (enableGroupCommit) {
                label = null;
            }
            refreshLoadUrl(buffer.getDatabase(), buffer.getTable());

            BatchBufferHttpEntity entity = new BatchBufferHttpEntity(buffer);
            HttpPutBuilder putBuilder = new HttpPutBuilder();
            putBuilder
                    .setUrl(loadUrl)
                    .baseAuth(username, password)
                    .setLabel(label)
                    .addCommonHeader()
                    .setEntity(entity)
                    .addHiddenColumns(executionOptions.getDeletable())
                    .addProperties(executionOptions.getStreamLoadProp());

            if (enableGzCompress) {
                putBuilder.setEntity(new GzipCompressingEntity(entity));
            }
            Throwable resEx = new Throwable();
            int retry = 0;
            while (retry <= executionOptions.getMaxRetries()) {
                if (enableGroupCommit) {
                    LOG.info("stream load started with group commit on host {}", hostPort);
                } else {
                    LOG.info(
                            "stream load started for {} on host {}",
                            putBuilder.getLabel(),
                            hostPort);
                }

                try (CloseableHttpClient httpClient = httpClientBuilder.build()) {
                    try (CloseableHttpResponse response = httpClient.execute(putBuilder.build())) {
                        int statusCode = response.getStatusLine().getStatusCode();
                        String reason = response.getStatusLine().toString();
                        if (statusCode == 200 && response.getEntity() != null) {
                            String loadResult = EntityUtils.toString(response.getEntity());
                            LOG.info("load Result {}", loadResult);
                            RespContent respContent =
                                    OBJECT_MAPPER.readValue(loadResult, RespContent.class);
                            if (DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
                                long cacheByteBeforeFlush =
                                        currentCacheBytes.getAndAdd(-respContent.getLoadBytes());
                                LOG.info(
                                        "load success, cacheBeforeFlushBytes: {}, currentCacheBytes : {}",
                                        cacheByteBeforeFlush,
                                        currentCacheBytes.get());
                                lock.lock();
                                try {
                                    block.signal();
                                } finally {
                                    lock.unlock();
                                }
                                return;
                            } else {
                                String errMsg = null;
                                if (StringUtils.isBlank(respContent.getMessage())
                                        && StringUtils.isBlank(respContent.getErrorURL())) {
                                    // sometimes stream load will not return message
                                    errMsg =
                                            String.format(
                                                    "stream load error, response is %s",
                                                    loadResult);
                                    throw new DorisBatchLoadException(errMsg);
                                } else {
                                    errMsg =
                                            String.format(
                                                    "stream load error: %s, see more in %s",
                                                    respContent.getMessage(),
                                                    respContent.getErrorURL());
                                }
                                throw new DorisBatchLoadException(errMsg);
                            }
                        }
                        LOG.error(
                                "stream load failed with {}, reason {}, to retry",
                                hostPort,
                                reason);
                        if (retry == executionOptions.getMaxRetries()) {
                            resEx = new DorisRuntimeException("stream load failed with: " + reason);
                        }
                    } catch (Exception ex) {
                        resEx = ex;
                        LOG.error("stream load error with {}, to retry, cause by", hostPort, ex);
                    }
                }
                retry++;
                // get available backend retry
                refreshLoadUrl(buffer.getDatabase(), buffer.getTable());
                putBuilder.setUrl(loadUrl);
                putBuilder.setLabel(label + "_" + retry);

                try {
                    Thread.sleep(retry * 1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
            }
            buffer.clear();
            buffer = null;

            if (retry >= executionOptions.getMaxRetries()) {
                throw new DorisBatchLoadException(
                        "stream load error: " + resEx.getMessage(), resEx);
            }
        }