public void load()

in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java [227:269]


        public void load(String label, BatchRecordBuffer buffer) throws IOException{
            refreshLoadUrl();
            ByteBuffer data = buffer.getData();
            ByteArrayEntity entity = new ByteArrayEntity(data.array(), data.arrayOffset(), data.limit());
            HttpPutBuilder putBuilder = new HttpPutBuilder();
            putBuilder.setUrl(loadUrl)
                    .baseAuth(username, password)
                    .setLabel(label)
                    .addCommonHeader()
                    .setEntity(entity)
                    .addHiddenColumns(executionOptions.getDeletable())
                    .addProperties(executionOptions.getStreamLoadProp());

            int retry = 0;
            while (retry <= executionOptions.getMaxRetries()) {
                LOG.info("stream load started for {} on host {}", label, hostPort);
                try (CloseableHttpResponse response = httpClient.execute(putBuilder.build())) {
                    int statusCode = response.getStatusLine().getStatusCode();
                    if (statusCode == 200 && response.getEntity() != null) {
                        String loadResult = EntityUtils.toString(response.getEntity());
                        LOG.info("load Result {}", loadResult);
                        RespContent respContent = OBJECT_MAPPER.readValue(loadResult, RespContent.class);
                        if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
                            String errMsg = String.format("stream load error: %s, see more in %s", respContent.getMessage(), respContent.getErrorURL());
                            throw new DorisBatchLoadException(errMsg);
                        }else{
                            return;
                        }
                    }
                    LOG.error("stream load failed with {}, reason {}, to retry", hostPort, response.getStatusLine().toString());
                }catch (Exception ex){
                    if (retry == executionOptions.getMaxRetries()) {
                        throw new DorisBatchLoadException("stream load error: ", ex);
                    }
                    LOG.error("stream load error with {}, to retry, cause by", hostPort, ex);

                }
                retry++;
                // get available backend retry
                refreshLoadUrl();
                putBuilder.setUrl(loadUrl);
            }
        }