in src/main/java/com/uber/rss/execution/ShuffleExecutor.java [648:741]
private void loadStateImpl(BaseMessage stateItem, Set<String> appIds, Set<String> deletedApps, Set<AppShuffleId> stages, Set<AppShuffleId> corruptedStages) {
if (stateItem instanceof StageInfoStateItem) {
StageInfoStateItem stageInfoStateItem = (StageInfoStateItem)stateItem;
AppShuffleId appShuffleId = stageInfoStateItem.getAppShuffleId();
appIds.add(appShuffleId.getAppId());
stages.add(appShuffleId);
int numPartitions = stageInfoStateItem.getNumPartitions();
ShuffleWriteConfig writeConfig = stageInfoStateItem.getWriteConfig();
int newStartIndex = stageInfoStateItem.getFileStartIndex() + writeConfig.getNumSplits();
byte fileStatus = stageInfoStateItem.getFileStatus();
// check whether stage state is already set, if not, set stage state
ExecutorShuffleStageState oldStageState = stageStates.get(appShuffleId);
ExecutorShuffleStageState effectiveStageState;
if (oldStageState == null) {
// stage state is not set, add stage state
ExecutorShuffleStageState newStageState = new ExecutorShuffleStageState(appShuffleId, writeConfig, newStartIndex);
newStageState.setNumMapsPartitions(stageInfoStateItem.getNumPartitions());
stageStates.put(appShuffleId, newStageState);
effectiveStageState = newStageState;
} else {
effectiveStageState = oldStageState;
// stage state is already set, check against values loaded from state
if (oldStageState.getNumPartitions() != numPartitions) {
oldStageState.setFileCorrupted();
stateLoadWarnings.inc(1);
logger.warn(String.format(
"Got different numPartitions when loading state for %s, old value: %s, new value: %s",
appShuffleId, oldStageState.getNumPartitions(), numPartitions));
corruptedStages.add(appShuffleId);
}
if (!oldStageState.getWriteConfig().equals(writeConfig)) {
oldStageState.setFileCorrupted();
stateLoadWarnings.inc(1);
logger.warn(String.format(
"Got different stage write config when loading state for %s, old value: %s, new value: %s",
appShuffleId, oldStageState.getWriteConfig(), writeConfig));
corruptedStages.add(appShuffleId);
}
if (oldStageState.getFileStartIndex() < newStartIndex) {
int oldStartIndex = oldStageState.getFileStartIndex();
oldStageState.setFileStartIndex(newStartIndex);
logger.info(
"Bump file start index for {} from {} to {}, splits: {}",
appShuffleId, oldStartIndex, newStartIndex, writeConfig.getNumSplits());
}
}
if (fileStatus == ShuffleStageStatus.FILE_STATUS_CORRUPTED) {
effectiveStageState.setFileCorrupted();
logger.info("Mark stage {} as corrupted due to loaded state marking it as corrupted", appShuffleId);
corruptedStages.add(appShuffleId);
}
if (corruptedStages.contains(appShuffleId)) {
effectiveStageState.setFileCorrupted();
}
// store stage info to make sure next time run will use new file start index
stateStore.storeStageInfo(appShuffleId, new StagePersistentInfo(
effectiveStageState.getNumPartitions(),
effectiveStageState.getFileStartIndex(),
effectiveStageState.getWriteConfig(),
effectiveStageState.getFileStatus()));
} else if (stateItem instanceof AppDeletionStateItem) {
AppDeletionStateItem appDeletionStateItem = (AppDeletionStateItem)stateItem;
String appId = appDeletionStateItem.getAppId();
appIds.add(appId);
deletedApps.add(appId);
} else if (stateItem instanceof TaskAttemptCommitStateItem) {
TaskAttemptCommitStateItem taskAttemptCommitStateItem = (TaskAttemptCommitStateItem)stateItem;
AppShuffleId appShuffleId = taskAttemptCommitStateItem.getAppShuffleId();
appIds.add(appShuffleId.getAppId());
stages.add(appShuffleId);
ExecutorShuffleStageState stageState = stageStates.get(appShuffleId);
if (stageState == null) {
stateLoadWarnings.inc(1);
logger.warn(String.format(
"Got TaskAttemptCommitStateItem: %s, but there is no stage stage for %s",
taskAttemptCommitStateItem, appShuffleId));
corruptedStages.add(appShuffleId);
} else {
for (Long mapTaskAttemptId: taskAttemptCommitStateItem.getMapTaskAttemptIds()) {
stageState.commitMapTask(mapTaskAttemptId);
}
stageState.addFinalizedFiles(taskAttemptCommitStateItem.getPartitionFilePathAndLengths());
if (corruptedStages.contains(appShuffleId)) {
stageState.setFileCorrupted();
}
}
} else if (stateItem instanceof StageCorruptionStateItem) {
StageCorruptionStateItem stageCorruptionStateItem = (StageCorruptionStateItem)stateItem;
corruptedStages.add(stageCorruptionStateItem.getAppShuffleId());
} else {
stateLoadWarnings.inc(1);
logger.warn(String.format("Got unsupported state item: %s", stateItem));
}
}