private void loadStateImpl()

in src/main/java/com/uber/rss/execution/ShuffleExecutor.java [648:741]


    private void loadStateImpl(BaseMessage stateItem, Set<String> appIds, Set<String> deletedApps, Set<AppShuffleId> stages, Set<AppShuffleId> corruptedStages) {
        if (stateItem instanceof StageInfoStateItem) {
            StageInfoStateItem stageInfoStateItem = (StageInfoStateItem)stateItem;
            AppShuffleId appShuffleId = stageInfoStateItem.getAppShuffleId();
            appIds.add(appShuffleId.getAppId());
            stages.add(appShuffleId);
            int numPartitions = stageInfoStateItem.getNumPartitions();
            ShuffleWriteConfig writeConfig = stageInfoStateItem.getWriteConfig();
            int newStartIndex = stageInfoStateItem.getFileStartIndex() + writeConfig.getNumSplits();
            byte fileStatus = stageInfoStateItem.getFileStatus();
            // check whether stage state is already set, if not, set stage state
            ExecutorShuffleStageState oldStageState = stageStates.get(appShuffleId);
            ExecutorShuffleStageState effectiveStageState;
            if (oldStageState == null) {
                // stage state is not set, add stage state
                ExecutorShuffleStageState newStageState = new ExecutorShuffleStageState(appShuffleId, writeConfig, newStartIndex);
                newStageState.setNumMapsPartitions(stageInfoStateItem.getNumPartitions());
                stageStates.put(appShuffleId, newStageState);
                effectiveStageState = newStageState;
            } else {
                effectiveStageState = oldStageState;
                // stage state is already set, check against values loaded from state
                if (oldStageState.getNumPartitions() != numPartitions) {
                    oldStageState.setFileCorrupted();
                    stateLoadWarnings.inc(1);
                    logger.warn(String.format(
                        "Got different numPartitions when loading state for %s, old value: %s, new value: %s",
                        appShuffleId, oldStageState.getNumPartitions(), numPartitions));
                    corruptedStages.add(appShuffleId);
                }
                if (!oldStageState.getWriteConfig().equals(writeConfig)) {
                    oldStageState.setFileCorrupted();
                    stateLoadWarnings.inc(1);
                    logger.warn(String.format(
                        "Got different stage write config when loading state for %s, old value: %s, new value: %s",
                        appShuffleId, oldStageState.getWriteConfig(), writeConfig));
                    corruptedStages.add(appShuffleId);
                }
                if (oldStageState.getFileStartIndex() < newStartIndex) {
                    int oldStartIndex = oldStageState.getFileStartIndex();
                    oldStageState.setFileStartIndex(newStartIndex);
                    logger.info(
                        "Bump file start index for {} from {} to {}, splits: {}",
                        appShuffleId, oldStartIndex, newStartIndex, writeConfig.getNumSplits());
                }
            }
            if (fileStatus == ShuffleStageStatus.FILE_STATUS_CORRUPTED) {
                effectiveStageState.setFileCorrupted();
                logger.info("Mark stage {} as corrupted due to loaded state marking it as corrupted", appShuffleId);
                corruptedStages.add(appShuffleId);
            }
            if (corruptedStages.contains(appShuffleId)) {
                effectiveStageState.setFileCorrupted();
            }
            // store stage info to make sure next time run will use new file start index
            stateStore.storeStageInfo(appShuffleId, new StagePersistentInfo(
                effectiveStageState.getNumPartitions(),
                effectiveStageState.getFileStartIndex(),
                effectiveStageState.getWriteConfig(),
                effectiveStageState.getFileStatus()));
        } else if (stateItem instanceof AppDeletionStateItem) {
            AppDeletionStateItem appDeletionStateItem = (AppDeletionStateItem)stateItem;
            String appId = appDeletionStateItem.getAppId();
            appIds.add(appId);
            deletedApps.add(appId);
        } else if (stateItem instanceof TaskAttemptCommitStateItem) {
            TaskAttemptCommitStateItem taskAttemptCommitStateItem = (TaskAttemptCommitStateItem)stateItem;
            AppShuffleId appShuffleId = taskAttemptCommitStateItem.getAppShuffleId();
            appIds.add(appShuffleId.getAppId());
            stages.add(appShuffleId);
            ExecutorShuffleStageState stageState = stageStates.get(appShuffleId);
            if (stageState == null) {
                stateLoadWarnings.inc(1);
                logger.warn(String.format(
                    "Got TaskAttemptCommitStateItem: %s, but there is no stage stage for %s",
                    taskAttemptCommitStateItem, appShuffleId));
                corruptedStages.add(appShuffleId);
            } else {
              for (Long mapTaskAttemptId: taskAttemptCommitStateItem.getMapTaskAttemptIds()) {
                stageState.commitMapTask(mapTaskAttemptId);
              }
              stageState.addFinalizedFiles(taskAttemptCommitStateItem.getPartitionFilePathAndLengths());
                if (corruptedStages.contains(appShuffleId)) {
                    stageState.setFileCorrupted();
                }
            }
        } else if (stateItem instanceof StageCorruptionStateItem) {
            StageCorruptionStateItem stageCorruptionStateItem = (StageCorruptionStateItem)stateItem;
            corruptedStages.add(stageCorruptionStateItem.getAppShuffleId());
        } else {
            stateLoadWarnings.inc(1);
            logger.warn(String.format("Got unsupported state item: %s", stateItem));
        }
    }