void cleanup()

in storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java [613:693]


    void cleanup() {
        try {
            LOG.info("Starting cleanup");
            LocalizedResourceRetentionSet toClean = new LocalizedResourceRetentionSet(cacheTargetSize);
            // need one large set of all and then clean via LRU
            for (Map.Entry<String, ConcurrentHashMap<String, LocalizedResource>> t : userArchives.entrySet()) {
                toClean.addResources(t.getValue());
                LOG.debug("Resources to be cleaned after adding {} archives : {}", t.getKey(), toClean);
            }

            for (Map.Entry<String, ConcurrentHashMap<String, LocalizedResource>> t : userFiles.entrySet()) {
                toClean.addResources(t.getValue());
                LOG.debug("Resources to be cleaned after adding {} files : {}", t.getKey(), toClean);
            }

            toClean.addResources(topologyBlobs);
            Set<String> topologiesWithDeletes = new HashSet<>();
            try (ClientBlobStore store = getClientBlobStore()) {
                Set<LocallyCachedBlob> deletedBlobs = toClean.cleanup(store);
                for (LocallyCachedBlob deletedBlob : deletedBlobs) {
                    String topologyId = ConfigUtils.getIdFromBlobKey(deletedBlob.getKey());
                    if (topologyId != null) {
                        topologiesWithDeletes.add(topologyId);
                    }
                }
            }

            HashSet<String> safeTopologyIds = new HashSet<>();
            for (String blobKey : topologyBlobs.keySet()) {
                safeTopologyIds.add(ConfigUtils.getIdFromBlobKey(blobKey));
            }
            LOG.debug("Topologies {} can no longer be considered fully downloaded", topologiesWithDeletes);
            safeTopologyIds.removeAll(topologiesWithDeletes);

            //Deleting this early does not hurt anything
            topologyBasicDownloaded.keySet().removeIf(topoId -> !safeTopologyIds.contains(topoId));
            blobPending.keySet().removeIf(topoId -> !safeTopologyIds.contains(topoId));

            try {
                forEachTopologyDistDir((p, topologyId) -> {
                    String topoJarKey = ConfigUtils.masterStormJarKey(topologyId);
                    String topoCodeKey = ConfigUtils.masterStormCodeKey(topologyId);
                    String topoConfKey = ConfigUtils.masterStormConfKey(topologyId);
                    if (!topologyBlobs.containsKey(topoJarKey)
                        && !topologyBlobs.containsKey(topoCodeKey)
                        && !topologyBlobs.containsKey(topoConfKey)) {
                        fsOps.deleteIfExists(p.toFile());
                    }
                });
            } catch (Exception e) {
                LOG.error("Could not read topology directories for cleanup", e);
            }

            LOG.debug("Resource cleanup: {}", toClean);
            Set<String> allUsers = new HashSet<>(userArchives.keySet());
            allUsers.addAll(userFiles.keySet());
            for (String user : allUsers) {
                ConcurrentMap<String, LocalizedResource> filesForUser = userFiles.get(user);
                ConcurrentMap<String, LocalizedResource> archivesForUser = userArchives.get(user);
                if ((filesForUser == null || filesForUser.size() == 0)
                        && (archivesForUser == null || archivesForUser.size() == 0)) {

                    LOG.debug("removing empty set: {}", user);
                    try {
                        LocalizedResource.completelyRemoveUnusedUser(localBaseDir, user);
                        userFiles.remove(user);
                        userArchives.remove(user);
                    } catch (IOException e) {
                        LOG.error("Error trying to delete cached user files", e);
                    }
                }
            }
        } catch (Exception ex) {
            LOG.error("AsyncLocalizer cleanup failure", ex);
        } catch (Error error) {
            LOG.error("AsyncLocalizer cleanup failure", error);
            Utils.exitProcess(20, "AsyncLocalizer cleanup failure");
        } finally {
            LOG.info("Finish cleanup");
        }
    }