in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java [134:168]
public void abortLingeringTransactions(Collection<DorisWriterState> recoveredStates)
throws Exception {
List<String> alreadyAborts = new ArrayList<>();
// abort label in state
for (DorisWriterState state : recoveredStates) {
LOG.info("try to abort txn from DorisWriterState {}", state.toString());
// Todo: When the sink parallelism is reduced,
// the txn of the redundant task before aborting is also needed.
if (!state.getLabelPrefix().equals(labelPrefix)) {
LOG.warn(
"Label prefix from previous execution {} has changed to {}.",
state.getLabelPrefix(),
executionOptions.getLabelPrefix());
}
if (state.getDatabase() == null || state.getTable() == null) {
LOG.warn(
"Transactions cannot be aborted when restore because the last used flink-doris-connector version less than 1.5.0.");
continue;
}
String key = state.getDatabase() + "." + state.getTable();
DorisStreamLoad streamLoader = getStreamLoader(key);
streamLoader.abortPreCommit(state.getLabelPrefix(), curCheckpointId);
alreadyAborts.add(state.getLabelPrefix());
}
// TODO: In a multi-table scenario, if do not restore from checkpoint,
// when modify labelPrefix at startup, we cannot abort the previous label.
if (!alreadyAborts.contains(labelPrefix)
&& StringUtils.isNotEmpty(dorisOptions.getTableIdentifier())
&& StringUtils.isNotEmpty(labelPrefix)) {
// abort current labelPrefix
DorisStreamLoad streamLoader = getStreamLoader(dorisOptions.getTableIdentifier());
streamLoader.abortPreCommit(labelPrefix, curCheckpointId);
}
}