public boolean canBeRecovered()

in spark-load/spark-load-core/src/main/java/org/apache/doris/load/job/PullLoader.java [208:303]


    public boolean canBeRecovered() throws SparkLoadException {
        if (isRecoveryMode) {
            String outputPath = etlJobConfig.getOutputPath();
            String parentOutputPath = outputPath.substring(0, StringUtils.lastIndexOf(outputPath, "/"));
            try {
                if (FileSystemUtils.exists(jobConfig, parentOutputPath)) {
                    FileStatus[] fileStatuses = FileSystemUtils.list(jobConfig, parentOutputPath);
                    if (fileStatuses.length != 1) {
                        return false;
                    }
                    fileStatuses = FileSystemUtils.list(jobConfig, fileStatuses[0].getPath().toString());
                    boolean hasDppResult = false;
                    for (FileStatus fileStatus : fileStatuses) {
                        String fileName = fileStatus.getPath().getName();
                        if (DPP_RESULT_JSON.equalsIgnoreCase(fileName)) {
                            hasDppResult = true;
                            String content = FileSystemUtils.readFile(jobConfig, fileStatus.getPath().toString());
                            if (StringUtils.isBlank(content)) {
                                return false;
                            }
                            DppResult dppResult = JsonUtils.readValue(content, DppResult.class);
                            if (!checkDppResult(dppResult)) {
                                LOG.info("previous etl job is failed, cannot be recovered");
                                return false;
                            }
                        }
                        // check meta consist
                        if (LOAD_META_JSON.equalsIgnoreCase(fileName)) {
                            String content = FileSystemUtils.readFile(jobConfig, fileStatus.getPath().toString());
                            if (StringUtils.isBlank(content)) {
                                return false;
                            }
                            LoadMeta oldLoadMeta = JsonUtils.readValue(content, LoadMeta.class);
                            for (Map.Entry<String, TableMeta> entry : loadMeta.getTableMeta().entrySet()) {
                                TableMeta tableMeta = entry.getValue();
                                TableMeta oldTableMeta = oldLoadMeta.getTableMeta().get(entry.getKey());
                                // index count is not consistent
                                if (oldTableMeta == null
                                        || oldTableMeta.getIndexes().size() != tableMeta.getIndexes().size()) {
                                    LOG.info("index size mismatch, cannot be recovered");
                                    return false;
                                }
                                Map<Long, EtlJobConfig.EtlIndex> indexMap = tableMeta.getIndexes().stream()
                                        .collect(Collectors.toMap(etlIndex -> etlIndex.indexId,
                                                TableMeta.EtlIndex::toEtlIndex));
                                Map<Long, EtlJobConfig.EtlIndex> oldIndexMap = oldTableMeta.getIndexes().stream()
                                        .collect(Collectors.toMap(etlIndex -> etlIndex.indexId,
                                                TableMeta.EtlIndex::toEtlIndex));
                                for (Map.Entry<Long, EtlJobConfig.EtlIndex> indexEntry : indexMap.entrySet()) {
                                    EtlJobConfig.EtlIndex index = indexEntry.getValue();
                                    EtlJobConfig.EtlIndex oldIndex = oldIndexMap.get(indexEntry.getKey());
                                    // index not exists
                                    if (oldIndex == null) {
                                        LOG.info("index " + index.indexId + " is not exists in previous meta");
                                        return false;
                                    }
                                    // index mismatch
                                    if (oldIndex.schemaHash != index.schemaHash
                                            || oldIndex.schemaVersion != index.schemaVersion) {
                                        LOG.info("index " + index.indexId + " has changed, "
                                                + "old schemaHash: " + oldIndex.schemaHash + " and schemaVersion: "
                                                + oldIndex.schemaVersion + " current schemaHash: "
                                                + index.schemaHash + " and schemaVersion: "
                                                + index.schemaVersion + ", cannot be recovered");
                                        return false;
                                    }
                                }
                                // check partition consistent
                                Set<Long> partitionSet = tableMeta.getPartitionInfo().partitions.stream().map(
                                        p -> p.partitionId).collect(Collectors.toSet());
                                Set<Long> oldPartitionSet = oldTableMeta.getPartitionInfo().partitions.stream().map(
                                        p -> p.partitionId).collect(Collectors.toSet());
                                if (oldPartitionSet.size() != partitionSet.size()) {
                                    LOG.info("partition size mismatch, old partition size: " + oldPartitionSet.size()
                                            + ", now partition size: " + partitionSet.size()
                                            + ", cannot be recovered");
                                    return false;
                                }
                                for (Long partitionId : partitionSet) {
                                    if (!oldPartitionSet.contains(partitionId)) {
                                        LOG.info("partition id mismatch, partition id: " + partitionId
                                                + ", cannot be recovered");
                                        return false;
                                    }
                                }
                            }
                        }
                    }
                    return hasDppResult;
                }
            } catch (IOException e) {
                throw new SparkLoadException("check recovery failed", e);
            }
        }
        return false;
    }