private void removeExpiredApplications()

in src/main/java/com/uber/rss/execution/ShuffleExecutor.java [537:589]


    private void removeExpiredApplications() {
        long currentMillis = System.currentTimeMillis();
        
        List<String> expiredAppIds = new ArrayList<>();
        for (Map.Entry<String, ExecutorAppState> entry: appStates.entrySet()) {
            if (entry.getValue().getLivenessTimestamp() < currentMillis - appRetentionMillis) {
                String appId = entry.getKey();
                expiredAppIds.add(appId);
                logger.info("Found expired application: {}", appId);
            }
        }

        numExpiredApplications.inc(expiredAppIds.size());
        
        for (String appId: expiredAppIds) {
            appStates.remove(appId);
            
            List<AppShuffleId> expiredAppShuffleIds = stageStates.keySet()
                    .stream()
                    .filter(t->t.getAppId().equals(appId))
                    .collect(Collectors.toList());
            List<ExecutorShuffleStageState> removedAppShuffleStageStates = 
                    expiredAppShuffleIds.stream()
                            .map(t->stageStates.remove(t))
                            .filter(t->t!=null)
                            .collect(Collectors.toList());

            // Close writers in case there are still writers not closed
            removedAppShuffleStageStates.stream().forEach(t -> t.closeWriters());

            try {
              stateStore.storeAppDeletion(appId);
            } catch (Throwable ex) {
              logger.warn("Failed to add app deletion in state store when removing expired application", ex);
            }

            logger.info("Removed expired application from internal state: {}, number of app shuffle id: {}",
                    appId,
                    expiredAppShuffleIds.size());
        }

        numLiveApplications.update(appStates.size());

        for (String appId: expiredAppIds) {
            String appDir = ShuffleFileUtils.getAppShuffleDir(rootDir, appId);
            try {
                logger.info("Deleting expired application directory: {}", appDir);
                storage.deleteDirectory(appDir);
            } catch (Throwable ex) {
                logger.warn(String.format("Failed to delete expired application directory: %s", appDir), ex);
            }
        }
    }