public void abortLingeringTransactions()

in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java [134:168]


    public void abortLingeringTransactions(Collection<DorisWriterState> recoveredStates)
            throws Exception {
        List<String> alreadyAborts = new ArrayList<>();
        // abort label in state
        for (DorisWriterState state : recoveredStates) {
            LOG.info("try to abort txn from DorisWriterState {}", state.toString());
            // Todo: When the sink parallelism is reduced,
            //  the txn of the redundant task before aborting is also needed.
            if (!state.getLabelPrefix().equals(labelPrefix)) {
                LOG.warn(
                        "Label prefix from previous execution {} has changed to {}.",
                        state.getLabelPrefix(),
                        executionOptions.getLabelPrefix());
            }
            if (state.getDatabase() == null || state.getTable() == null) {
                LOG.warn(
                        "Transactions cannot be aborted when restore because the last used flink-doris-connector version less than 1.5.0.");
                continue;
            }
            String key = state.getDatabase() + "." + state.getTable();
            DorisStreamLoad streamLoader = getStreamLoader(key);
            streamLoader.abortPreCommit(state.getLabelPrefix(), curCheckpointId);
            alreadyAborts.add(state.getLabelPrefix());
        }

        // TODO: In a multi-table scenario, if do not restore from checkpoint,
        //  when modify labelPrefix at startup, we cannot abort the previous label.
        if (!alreadyAborts.contains(labelPrefix)
                && StringUtils.isNotEmpty(dorisOptions.getTableIdentifier())
                && StringUtils.isNotEmpty(labelPrefix)) {
            // abort current labelPrefix
            DorisStreamLoad streamLoader = getStreamLoader(dorisOptions.getTableIdentifier());
            streamLoader.abortPreCommit(labelPrefix, curCheckpointId);
        }
    }