in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java [328:397]
private void checkAllDone(String tableIdentifier, DorisStreamLoad dorisStreamLoad) {
// the load future is done and checked in prepareCommit().
// this will check error while loading.
if (dorisStreamLoad.getPendingLoadFuture() != null
&& dorisStreamLoad.getPendingLoadFuture().isDone()) {
if (!globalLoading || !loadingMap.get(tableIdentifier)) {
LOG.debug(
"not loading, skip timer checker for table {}, {}",
tableIdentifier,
globalLoading);
return;
}
// double-check the future, to avoid getting the old future
if (dorisStreamLoad.getPendingLoadFuture() != null
&& dorisStreamLoad.getPendingLoadFuture().isDone()) {
// error happened when loading, now we should stop receive data
// and abort previous txn(stream load) and start a new txn(stream load)
// use send cached data to new txn, then notify to restart the stream
if (executionOptions.isUseCache()) {
try {
dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend(subtaskId));
if (executionOptions.enabled2PC()) {
dorisStreamLoad.abortPreCommit(labelPrefix, curCheckpointId);
}
// start a new txn(stream load)
LOG.info(
"getting exception, breakpoint resume for checkpoint ID: {}, table {}",
curCheckpointId,
tableIdentifier);
LabelGenerator labelGenerator = getLabelGenerator(tableIdentifier);
dorisStreamLoad.startLoad(
labelGenerator.generateTableLabel(curCheckpointId), true);
} catch (Exception e) {
throw new DorisRuntimeException(e);
}
} else {
String errorMsg;
try {
RespContent content = dorisStreamLoad.getPendingLoadFuture().get();
if (executionOptions.enabled2PC()
&& LoadStatus.LABEL_ALREADY_EXIST.equals(content.getStatus())) {
LOG.info(
"try to abort {} cause Label Already Exists",
content.getLabel());
dorisStreamLoad.abortLabelExistTransaction(content);
errorMsg = "Exist label abort finished, retry";
LOG.info(errorMsg);
return;
} else {
errorMsg = content.getMessage();
loadException = new StreamLoadException(errorMsg);
}
} catch (Exception e) {
errorMsg = e.getMessage();
loadException = new DorisRuntimeException(e);
}
LOG.error(
"table {} stream load finished unexpectedly, interrupt worker thread! {}",
tableIdentifier,
errorMsg);
// set the executor thread interrupted in case blocking in write data.
executorThread.interrupt();
}
}
}
}