private StateStoreLoadResult loadStateStoreImpl()

in src/main/java/com/uber/rss/execution/ShuffleExecutor.java [591:646]


    private StateStoreLoadResult loadStateStoreImpl() {
        long startTime = System.currentTimeMillis();
        boolean partialLoad = false;

        long totalDataItems = 0;
        Set<String> appIds = new HashSet<>();
        Set<String> deletedApps = new HashSet<>();
        Set<AppShuffleId> stages = new HashSet<>();
        Set<AppShuffleId> corruptedStages = new HashSet<>();
        LocalFileStateStoreIterator stateItemIterator = stateStore.loadData();
        try {
            while (stateItemIterator.hasNext()) {
                BaseMessage item = stateItemIterator.next();
                loadStateImpl(item, appIds, deletedApps, stages, corruptedStages);
                totalDataItems++;

                if (System.currentTimeMillis() - startTime > MAX_STATE_LOAD_MILLIS) {
                    partialLoad = true;
                    statePartialLoads.inc(1);
                    break;
                }
            }
        } finally {
            stateItemIterator.close();
        }

        for (AppShuffleId corruptedStage: corruptedStages) {
            ExecutorShuffleStageState stageState = stageStates.get(corruptedStage);
            if (stageState != null) {
                stageState.setFileCorrupted();
                stateStore.storeStageCorruption(stageState.getAppShuffleId());
            }
        }

        int deletedStageCount = 0;
        for (String appId: deletedApps) {
            List<AppShuffleId> appShuffleIdsToDelete = stageStates.keySet().stream()
                .filter(t->t.getAppId().equals(appId)).collect(Collectors.toList());
            deletedStageCount += appShuffleIdsToDelete.size();
            for (AppShuffleId entry: appShuffleIdsToDelete) {
                stageStates.remove(entry);
            }
            stateStore.storeAppDeletion(appId);
        }

        stateStore.commit();

        appIds.removeAll(deletedApps);
        for (String appId: appIds) {
          ExecutorAppState appState = new ExecutorAppState(appId);
          appState.updateLivenessTimestamp();
          appStates.put(appId, appState);
        }

        return new StateStoreLoadResult(partialLoad, totalDataItems, appIds.size(), deletedApps.size(), stages.size(), corruptedStages.size(), deletedStageCount);
    }