private void checkAllDone()

in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java [328:397]


    private void checkAllDone(String tableIdentifier, DorisStreamLoad dorisStreamLoad) {
        // the load future is done and checked in prepareCommit().
        // this will check error while loading.
        if (dorisStreamLoad.getPendingLoadFuture() != null
                && dorisStreamLoad.getPendingLoadFuture().isDone()) {
            if (!globalLoading || !loadingMap.get(tableIdentifier)) {
                LOG.debug(
                        "not loading, skip timer checker for table {}, {}",
                        tableIdentifier,
                        globalLoading);
                return;
            }

            // double-check the future, to avoid getting the old future
            if (dorisStreamLoad.getPendingLoadFuture() != null
                    && dorisStreamLoad.getPendingLoadFuture().isDone()) {
                // error happened when loading, now we should stop receive data
                // and abort previous txn(stream load) and start a new txn(stream load)
                // use send cached data to new txn, then notify to restart the stream
                if (executionOptions.isUseCache()) {
                    try {

                        dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend(subtaskId));
                        if (executionOptions.enabled2PC()) {
                            dorisStreamLoad.abortPreCommit(labelPrefix, curCheckpointId);
                        }
                        // start a new txn(stream load)
                        LOG.info(
                                "getting exception, breakpoint resume for checkpoint ID: {}, table {}",
                                curCheckpointId,
                                tableIdentifier);
                        LabelGenerator labelGenerator = getLabelGenerator(tableIdentifier);
                        dorisStreamLoad.startLoad(
                                labelGenerator.generateTableLabel(curCheckpointId), true);
                    } catch (Exception e) {
                        throw new DorisRuntimeException(e);
                    }
                } else {
                    String errorMsg;
                    try {
                        RespContent content = dorisStreamLoad.getPendingLoadFuture().get();
                        if (executionOptions.enabled2PC()
                                && LoadStatus.LABEL_ALREADY_EXIST.equals(content.getStatus())) {
                            LOG.info(
                                    "try to abort {} cause Label Already Exists",
                                    content.getLabel());
                            dorisStreamLoad.abortLabelExistTransaction(content);
                            errorMsg = "Exist label abort finished, retry";
                            LOG.info(errorMsg);
                            return;
                        } else {
                            errorMsg = content.getMessage();
                            loadException = new StreamLoadException(errorMsg);
                        }
                    } catch (Exception e) {
                        errorMsg = e.getMessage();
                        loadException = new DorisRuntimeException(e);
                    }

                    LOG.error(
                            "table {} stream load finished unexpectedly, interrupt worker thread! {}",
                            tableIdentifier,
                            errorMsg);

                    // set the executor thread interrupted in case blocking in write data.
                    executorThread.interrupt();
                }
            }
        }
    }