in spark-load/spark-load-core/src/main/java/org/apache/doris/load/job/PullLoader.java [122:173]
public void afterFinished() throws SparkLoadException {
DorisClient.FeClient feClient = DorisClient.getFeClient(jobConfig.getFeAddresses(), jobConfig.getUser(),
jobConfig.getPassword());
statusInfo.put("status", jobStatus.name());
statusInfo.put("msg", "");
statusInfo.put("appId", appHandle == null ? null : appHandle.getAppId());
statusInfo.put("storageType", jobConfig.getStorageType().name());
try {
String dppResultStr = null;
int checkCnt = 0;
while (checkCnt < 3) {
try {
dppResultStr = getDppResultString();
} catch (UnsupportedOperationException e) {
LOG.warn("retry get dpp result", e);
checkCnt++;
LockSupport.parkNanos(Duration.ofMillis(500).toNanos());
}
if (dppResultStr != null) {
break;
}
}
if (dppResultStr == null) {
throw new SparkLoadException("get dpp result str failed");
}
statusInfo.put("dppResult", dppResultStr);
statusInfo.put("filePathToSize", JsonUtils.writeValueAsString(getFilePathToSize()));
statusInfo.put("hadoopProperties", JsonUtils.writeValueAsString(getHadoopProperties()));
} catch (IOException e) {
throw new SparkLoadException("update job status failed", e);
}
feClient.updateIngestionLoad(jobConfig.getDatabase(), loadMeta.getLoadId(), statusInfo);
do {
LoadInfo loadInfo = feClient.getLoadInfo(jobConfig.getDatabase(), jobConfig.getLabel());
switch (loadInfo.getState().toUpperCase(Locale.ROOT)) {
case "FINISHED":
LOG.info("load job finished.");
try {
cleanOutputPath();
} catch (IOException e) {
LOG.warn("clean output path failed", e);
}
return;
case "CANCELLED":
throw new SparkLoadException("load job failed, " + loadInfo.getFailMsg());
default:
LOG.info("load job unfinished, state: " + loadInfo.getState());
break;
}
LockSupport.parkNanos(Duration.ofSeconds(15).toNanos());
} while (true);
}