in spark-load/spark-load-core/src/main/java/org/apache/doris/load/job/PullLoader.java [208:303]
public boolean canBeRecovered() throws SparkLoadException {
if (isRecoveryMode) {
String outputPath = etlJobConfig.getOutputPath();
String parentOutputPath = outputPath.substring(0, StringUtils.lastIndexOf(outputPath, "/"));
try {
if (FileSystemUtils.exists(jobConfig, parentOutputPath)) {
FileStatus[] fileStatuses = FileSystemUtils.list(jobConfig, parentOutputPath);
if (fileStatuses.length != 1) {
return false;
}
fileStatuses = FileSystemUtils.list(jobConfig, fileStatuses[0].getPath().toString());
boolean hasDppResult = false;
for (FileStatus fileStatus : fileStatuses) {
String fileName = fileStatus.getPath().getName();
if (DPP_RESULT_JSON.equalsIgnoreCase(fileName)) {
hasDppResult = true;
String content = FileSystemUtils.readFile(jobConfig, fileStatus.getPath().toString());
if (StringUtils.isBlank(content)) {
return false;
}
DppResult dppResult = JsonUtils.readValue(content, DppResult.class);
if (!checkDppResult(dppResult)) {
LOG.info("previous etl job is failed, cannot be recovered");
return false;
}
}
// check meta consist
if (LOAD_META_JSON.equalsIgnoreCase(fileName)) {
String content = FileSystemUtils.readFile(jobConfig, fileStatus.getPath().toString());
if (StringUtils.isBlank(content)) {
return false;
}
LoadMeta oldLoadMeta = JsonUtils.readValue(content, LoadMeta.class);
for (Map.Entry<String, TableMeta> entry : loadMeta.getTableMeta().entrySet()) {
TableMeta tableMeta = entry.getValue();
TableMeta oldTableMeta = oldLoadMeta.getTableMeta().get(entry.getKey());
// index count is not consistent
if (oldTableMeta == null
|| oldTableMeta.getIndexes().size() != tableMeta.getIndexes().size()) {
LOG.info("index size mismatch, cannot be recovered");
return false;
}
Map<Long, EtlJobConfig.EtlIndex> indexMap = tableMeta.getIndexes().stream()
.collect(Collectors.toMap(etlIndex -> etlIndex.indexId,
TableMeta.EtlIndex::toEtlIndex));
Map<Long, EtlJobConfig.EtlIndex> oldIndexMap = oldTableMeta.getIndexes().stream()
.collect(Collectors.toMap(etlIndex -> etlIndex.indexId,
TableMeta.EtlIndex::toEtlIndex));
for (Map.Entry<Long, EtlJobConfig.EtlIndex> indexEntry : indexMap.entrySet()) {
EtlJobConfig.EtlIndex index = indexEntry.getValue();
EtlJobConfig.EtlIndex oldIndex = oldIndexMap.get(indexEntry.getKey());
// index not exists
if (oldIndex == null) {
LOG.info("index " + index.indexId + " is not exists in previous meta");
return false;
}
// index mismatch
if (oldIndex.schemaHash != index.schemaHash
|| oldIndex.schemaVersion != index.schemaVersion) {
LOG.info("index " + index.indexId + " has changed, "
+ "old schemaHash: " + oldIndex.schemaHash + " and schemaVersion: "
+ oldIndex.schemaVersion + " current schemaHash: "
+ index.schemaHash + " and schemaVersion: "
+ index.schemaVersion + ", cannot be recovered");
return false;
}
}
// check partition consistent
Set<Long> partitionSet = tableMeta.getPartitionInfo().partitions.stream().map(
p -> p.partitionId).collect(Collectors.toSet());
Set<Long> oldPartitionSet = oldTableMeta.getPartitionInfo().partitions.stream().map(
p -> p.partitionId).collect(Collectors.toSet());
if (oldPartitionSet.size() != partitionSet.size()) {
LOG.info("partition size mismatch, old partition size: " + oldPartitionSet.size()
+ ", now partition size: " + partitionSet.size()
+ ", cannot be recovered");
return false;
}
for (Long partitionId : partitionSet) {
if (!oldPartitionSet.contains(partitionId)) {
LOG.info("partition id mismatch, partition id: " + partitionId
+ ", cannot be recovered");
return false;
}
}
}
}
}
return hasDppResult;
}
} catch (IOException e) {
throw new SparkLoadException("check recovery failed", e);
}
}
return false;
}