in src/main/java/com/uber/rss/execution/ShuffleExecutor.java [591:646]
private StateStoreLoadResult loadStateStoreImpl() {
long startTime = System.currentTimeMillis();
boolean partialLoad = false;
long totalDataItems = 0;
Set<String> appIds = new HashSet<>();
Set<String> deletedApps = new HashSet<>();
Set<AppShuffleId> stages = new HashSet<>();
Set<AppShuffleId> corruptedStages = new HashSet<>();
LocalFileStateStoreIterator stateItemIterator = stateStore.loadData();
try {
while (stateItemIterator.hasNext()) {
BaseMessage item = stateItemIterator.next();
loadStateImpl(item, appIds, deletedApps, stages, corruptedStages);
totalDataItems++;
if (System.currentTimeMillis() - startTime > MAX_STATE_LOAD_MILLIS) {
partialLoad = true;
statePartialLoads.inc(1);
break;
}
}
} finally {
stateItemIterator.close();
}
for (AppShuffleId corruptedStage: corruptedStages) {
ExecutorShuffleStageState stageState = stageStates.get(corruptedStage);
if (stageState != null) {
stageState.setFileCorrupted();
stateStore.storeStageCorruption(stageState.getAppShuffleId());
}
}
int deletedStageCount = 0;
for (String appId: deletedApps) {
List<AppShuffleId> appShuffleIdsToDelete = stageStates.keySet().stream()
.filter(t->t.getAppId().equals(appId)).collect(Collectors.toList());
deletedStageCount += appShuffleIdsToDelete.size();
for (AppShuffleId entry: appShuffleIdsToDelete) {
stageStates.remove(entry);
}
stateStore.storeAppDeletion(appId);
}
stateStore.commit();
appIds.removeAll(deletedApps);
for (String appId: appIds) {
ExecutorAppState appState = new ExecutorAppState(appId);
appState.updateLivenessTimestamp();
appStates.put(appId, appState);
}
return new StateStoreLoadResult(partialLoad, totalDataItems, appIds.size(), deletedApps.size(), stages.size(), corruptedStages.size(), deletedStageCount);
}