in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java [136:182]
public void abortPreCommit(String labelSuffix, long chkID) throws Exception {
long startChkID = chkID;
LOG.info("abort for labelSuffix {}. start chkId {}.", labelSuffix, chkID);
while (true) {
try {
String label = labelGenerator.generateLabel(startChkID);
HttpPutBuilder builder = new HttpPutBuilder();
builder.setUrl(loadUrlStr)
.baseAuth(user, passwd)
.addCommonHeader()
.enable2PC()
.setLabel(label)
.setEmptyEntity()
.addProperties(streamLoadProp);
RespContent respContent = handlePreCommitResponse(httpClient.execute(builder.build()));
Preconditions.checkState("true".equals(respContent.getTwoPhaseCommit()));
if (LABEL_ALREADY_EXIST.equals(respContent.getStatus())) {
// label already exist and job finished
if (JOB_EXIST_FINISHED.equals(respContent.getExistingJobStatus())) {
throw new DorisException("Load status is " + LABEL_ALREADY_EXIST + " and load job finished, " +
"change you label prefix or restore from latest savepoint!");
}
// job not finished, abort.
Matcher matcher = LABEL_EXIST_PATTERN.matcher(respContent.getMessage());
if (matcher.find()) {
Preconditions.checkState(label.equals(matcher.group(1)));
long txnId = Long.parseLong(matcher.group(2));
LOG.info("abort {} for exist label {}", txnId, label);
abortTransaction(txnId);
} else {
LOG.error("response: {}", respContent.toString());
throw new DorisException("Load Status is " + LABEL_ALREADY_EXIST + ", but no txnID associated with it!");
}
} else {
LOG.info("abort {} for check label {}.", respContent.getTxnId(), label);
abortTransaction(respContent.getTxnId());
break;
}
startChkID++;
} catch (Exception e) {
LOG.warn("failed to stream load data", e);
throw e;
}
}
LOG.info("abort for labelSuffix {} finished", labelSuffix);
}