public void afterFinished()

in spark-load/spark-load-core/src/main/java/org/apache/doris/load/job/PullLoader.java [122:173]


    public void afterFinished() throws SparkLoadException {
        DorisClient.FeClient feClient = DorisClient.getFeClient(jobConfig.getFeAddresses(), jobConfig.getUser(),
                jobConfig.getPassword());
        statusInfo.put("status", jobStatus.name());
        statusInfo.put("msg", "");
        statusInfo.put("appId", appHandle == null ? null : appHandle.getAppId());
        statusInfo.put("storageType", jobConfig.getStorageType().name());
        try {
            String dppResultStr = null;
            int checkCnt = 0;
            while (checkCnt < 3) {
                try {
                    dppResultStr = getDppResultString();
                } catch (UnsupportedOperationException e) {
                    LOG.warn("retry get dpp result", e);
                    checkCnt++;
                    LockSupport.parkNanos(Duration.ofMillis(500).toNanos());
                }
                if (dppResultStr != null) {
                    break;
                }
            }
            if (dppResultStr == null) {
                throw new SparkLoadException("get dpp result str failed");
            }
            statusInfo.put("dppResult", dppResultStr);
            statusInfo.put("filePathToSize", JsonUtils.writeValueAsString(getFilePathToSize()));
            statusInfo.put("hadoopProperties", JsonUtils.writeValueAsString(getHadoopProperties()));
        } catch (IOException e) {
            throw new SparkLoadException("update job status failed", e);
        }
        feClient.updateIngestionLoad(jobConfig.getDatabase(), loadMeta.getLoadId(), statusInfo);
        do {
            LoadInfo loadInfo = feClient.getLoadInfo(jobConfig.getDatabase(), jobConfig.getLabel());
            switch (loadInfo.getState().toUpperCase(Locale.ROOT)) {
                case "FINISHED":
                    LOG.info("load job finished.");
                    try {
                        cleanOutputPath();
                    } catch (IOException e) {
                        LOG.warn("clean output path failed", e);
                    }
                    return;
                case "CANCELLED":
                    throw new SparkLoadException("load job failed, " + loadInfo.getFailMsg());
                default:
                    LOG.info("load job unfinished, state: " + loadInfo.getState());
                    break;
            }
            LockSupport.parkNanos(Duration.ofSeconds(15).toNanos());
        } while (true);
    }