public void abortPreCommit()

in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java [136:182]


    public void abortPreCommit(String labelSuffix, long chkID) throws Exception {
        long startChkID = chkID;
        LOG.info("abort for labelSuffix {}. start chkId {}.", labelSuffix, chkID);
        while (true) {
            try {
                String label = labelGenerator.generateLabel(startChkID);
                HttpPutBuilder builder = new HttpPutBuilder();
                builder.setUrl(loadUrlStr)
                        .baseAuth(user, passwd)
                        .addCommonHeader()
                        .enable2PC()
                        .setLabel(label)
                        .setEmptyEntity()
                        .addProperties(streamLoadProp);
                RespContent respContent = handlePreCommitResponse(httpClient.execute(builder.build()));
                Preconditions.checkState("true".equals(respContent.getTwoPhaseCommit()));
                if (LABEL_ALREADY_EXIST.equals(respContent.getStatus())) {
                    // label already exist and job finished
                    if (JOB_EXIST_FINISHED.equals(respContent.getExistingJobStatus())) {
                        throw new DorisException("Load status is " + LABEL_ALREADY_EXIST + " and load job finished, " +
                                "change you label prefix or restore from latest savepoint!");

                    }
                    // job not finished, abort.
                    Matcher matcher  = LABEL_EXIST_PATTERN.matcher(respContent.getMessage());
                    if (matcher.find()) {
                        Preconditions.checkState(label.equals(matcher.group(1)));
                        long txnId = Long.parseLong(matcher.group(2));
                        LOG.info("abort {} for exist label {}", txnId, label);
                        abortTransaction(txnId);
                    } else {
                        LOG.error("response: {}", respContent.toString());
                        throw new DorisException("Load Status is " + LABEL_ALREADY_EXIST + ", but no txnID associated with it!");
                    }
                } else {
                    LOG.info("abort {} for check label {}.", respContent.getTxnId(), label);
                    abortTransaction(respContent.getTxnId());
                    break;
                }
                startChkID++;
            } catch (Exception e) {
                LOG.warn("failed to stream load data", e);
                throw e;
            }
        }
        LOG.info("abort for labelSuffix {} finished", labelSuffix);
    }