public void abortPreCommit()

in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java [193:253]


    public void abortPreCommit(String labelPrefix, long chkID) throws Exception {
        long startChkID = chkID;
        LOG.info(
                "abort for labelPrefix {}, concat labelPrefix {},  start chkId {}.",
                labelPrefix,
                labelGenerator.getConcatLabelPrefix(),
                chkID);
        while (true) {
            try {
                // TODO: According to label abort txn.
                //  Currently, it can only be aborted based on txnid, so we must
                //  first request a streamload based on the label to get the txnid.
                String label = labelGenerator.generateTableLabel(startChkID);
                LOG.info("start a check label {} to load.", label);
                HttpPutBuilder builder = new HttpPutBuilder();
                builder.setUrl(loadUrlStr)
                        .baseAuth(user, passwd)
                        .addCommonHeader()
                        .enable2PC()
                        .setLabel(label)
                        .setEmptyEntity()
                        .addProperties(streamLoadProp);
                RespContent respContent =
                        handlePreCommitResponse(httpClient.execute(builder.build()));
                Preconditions.checkState("true".equals(respContent.getTwoPhaseCommit()));
                if (LABEL_ALREADY_EXIST.equals(respContent.getStatus())) {
                    // label already exist and job finished
                    if (JOB_EXIST_FINISHED.equals(respContent.getExistingJobStatus())) {
                        throw new DorisException(
                                "Load status is "
                                        + LABEL_ALREADY_EXIST
                                        + " and load job finished, "
                                        + "change you label prefix or restore from latest savepoint!");
                    }
                    // job not finished, abort.
                    Matcher matcher = LABEL_EXIST_PATTERN.matcher(respContent.getMessage());
                    if (matcher.find()) {
                        Preconditions.checkState(label.equals(matcher.group(1)));
                        long txnId = Long.parseLong(matcher.group(2));
                        LOG.info("abort {} for exist label {}", txnId, label);
                        abortTransaction(txnId);
                    } else {
                        LOG.error("response: {}", respContent.toString());
                        throw new DorisException(
                                "Load Status is "
                                        + LABEL_ALREADY_EXIST
                                        + ", but no txnID associated with it!");
                    }
                } else {
                    LOG.info("abort {} for check label {}.", respContent.getTxnId(), label);
                    abortTransaction(respContent.getTxnId());
                    break;
                }
                startChkID++;
            } catch (Exception e) {
                LOG.warn("failed to abort labelPrefix {}", labelPrefix, e);
                throw e;
            }
        }
        LOG.info("abort for labelPrefix {} finished", labelPrefix);
    }