in src/main/java/com/uber/rss/execution/ShuffleExecutor.java [537:589]
private void removeExpiredApplications() {
long currentMillis = System.currentTimeMillis();
List<String> expiredAppIds = new ArrayList<>();
for (Map.Entry<String, ExecutorAppState> entry: appStates.entrySet()) {
if (entry.getValue().getLivenessTimestamp() < currentMillis - appRetentionMillis) {
String appId = entry.getKey();
expiredAppIds.add(appId);
logger.info("Found expired application: {}", appId);
}
}
numExpiredApplications.inc(expiredAppIds.size());
for (String appId: expiredAppIds) {
appStates.remove(appId);
List<AppShuffleId> expiredAppShuffleIds = stageStates.keySet()
.stream()
.filter(t->t.getAppId().equals(appId))
.collect(Collectors.toList());
List<ExecutorShuffleStageState> removedAppShuffleStageStates =
expiredAppShuffleIds.stream()
.map(t->stageStates.remove(t))
.filter(t->t!=null)
.collect(Collectors.toList());
// Close writers in case there are still writers not closed
removedAppShuffleStageStates.stream().forEach(t -> t.closeWriters());
try {
stateStore.storeAppDeletion(appId);
} catch (Throwable ex) {
logger.warn("Failed to add app deletion in state store when removing expired application", ex);
}
logger.info("Removed expired application from internal state: {}, number of app shuffle id: {}",
appId,
expiredAppShuffleIds.size());
}
numLiveApplications.update(appStates.size());
for (String appId: expiredAppIds) {
String appDir = ShuffleFileUtils.getAppShuffleDir(rootDir, appId);
try {
logger.info("Deleting expired application directory: {}", appDir);
storage.deleteDirectory(appDir);
} catch (Throwable ex) {
logger.warn(String.format("Failed to delete expired application directory: %s", appDir), ex);
}
}
}