public void load()

in src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java [72:132]


    public void load(String label, RecordBuffer buffer) throws IOException {

        if (enableGroupCommit) {
            label = null;
        }

        refreshLoadUrl(database, table);
        String data = buffer.getData();
        ByteArrayEntity entity = new ByteArrayEntity(data.getBytes(StandardCharsets.UTF_8));
        HttpPutBuilder putBuilder = new HttpPutBuilder();
        putBuilder
                .setUrl(loadUrl)
                .baseAuth(user, password)
                .setLabel(label)
                .addCommonHeader()
                .setEntity(entity)
                .addHiddenColumns(dorisOptions.isEnableDelete())
                .enable2PC(dorisOptions.enable2PC())
                .addProperties(dorisOptions.getStreamLoadProp());

        if (enableGroupCommit) {
            LOG.info("stream load started with group commit on host {}", hostPort);
        } else {
            LOG.info("stream load started for {} on host {}", label, hostPort);
        }

        try (CloseableHttpResponse response = httpClient.execute(putBuilder.build())) {
            int statusCode = response.getStatusLine().getStatusCode();
            if (statusCode == 200 && response.getEntity() != null) {
                String loadResult = EntityUtils.toString(response.getEntity());
                LOG.info("load Result {}", loadResult);
                KafkaRespContent respContent =
                        OBJECT_MAPPER.readValue(loadResult, KafkaRespContent.class);
                if (respContent == null || respContent.getMessage() == null) {
                    throw new StreamLoadException("response error : " + loadResult);
                }
                if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
                    String errMsg =
                            String.format(
                                    "stream load error: %s, see more in %s",
                                    respContent.getMessage(), respContent.getErrorURL());
                    throw new StreamLoadException(errMsg);
                }
                respContent.setDatabase(database);
                respContent.setTable(table);
                respContent.setLastOffset(buffer.getLastOffset());
                respContent.setTopic(topic);
                respContents.add(respContent);
            }
        } catch (Exception ex) {
            String err;
            if (enableGroupCommit) {
                err = "failed to stream load data with group commit";
            } else {
                err = "failed to stream load data with label: " + label;
            }

            LOG.warn(err, ex);
            throw new StreamLoadException(err, ex);
        }
    }