in storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java [613:693]
void cleanup() {
try {
LOG.info("Starting cleanup");
LocalizedResourceRetentionSet toClean = new LocalizedResourceRetentionSet(cacheTargetSize);
// need one large set of all and then clean via LRU
for (Map.Entry<String, ConcurrentHashMap<String, LocalizedResource>> t : userArchives.entrySet()) {
toClean.addResources(t.getValue());
LOG.debug("Resources to be cleaned after adding {} archives : {}", t.getKey(), toClean);
}
for (Map.Entry<String, ConcurrentHashMap<String, LocalizedResource>> t : userFiles.entrySet()) {
toClean.addResources(t.getValue());
LOG.debug("Resources to be cleaned after adding {} files : {}", t.getKey(), toClean);
}
toClean.addResources(topologyBlobs);
Set<String> topologiesWithDeletes = new HashSet<>();
try (ClientBlobStore store = getClientBlobStore()) {
Set<LocallyCachedBlob> deletedBlobs = toClean.cleanup(store);
for (LocallyCachedBlob deletedBlob : deletedBlobs) {
String topologyId = ConfigUtils.getIdFromBlobKey(deletedBlob.getKey());
if (topologyId != null) {
topologiesWithDeletes.add(topologyId);
}
}
}
HashSet<String> safeTopologyIds = new HashSet<>();
for (String blobKey : topologyBlobs.keySet()) {
safeTopologyIds.add(ConfigUtils.getIdFromBlobKey(blobKey));
}
LOG.debug("Topologies {} can no longer be considered fully downloaded", topologiesWithDeletes);
safeTopologyIds.removeAll(topologiesWithDeletes);
//Deleting this early does not hurt anything
topologyBasicDownloaded.keySet().removeIf(topoId -> !safeTopologyIds.contains(topoId));
blobPending.keySet().removeIf(topoId -> !safeTopologyIds.contains(topoId));
try {
forEachTopologyDistDir((p, topologyId) -> {
String topoJarKey = ConfigUtils.masterStormJarKey(topologyId);
String topoCodeKey = ConfigUtils.masterStormCodeKey(topologyId);
String topoConfKey = ConfigUtils.masterStormConfKey(topologyId);
if (!topologyBlobs.containsKey(topoJarKey)
&& !topologyBlobs.containsKey(topoCodeKey)
&& !topologyBlobs.containsKey(topoConfKey)) {
fsOps.deleteIfExists(p.toFile());
}
});
} catch (Exception e) {
LOG.error("Could not read topology directories for cleanup", e);
}
LOG.debug("Resource cleanup: {}", toClean);
Set<String> allUsers = new HashSet<>(userArchives.keySet());
allUsers.addAll(userFiles.keySet());
for (String user : allUsers) {
ConcurrentMap<String, LocalizedResource> filesForUser = userFiles.get(user);
ConcurrentMap<String, LocalizedResource> archivesForUser = userArchives.get(user);
if ((filesForUser == null || filesForUser.size() == 0)
&& (archivesForUser == null || archivesForUser.size() == 0)) {
LOG.debug("removing empty set: {}", user);
try {
LocalizedResource.completelyRemoveUnusedUser(localBaseDir, user);
userFiles.remove(user);
userArchives.remove(user);
} catch (IOException e) {
LOG.error("Error trying to delete cached user files", e);
}
}
}
} catch (Exception ex) {
LOG.error("AsyncLocalizer cleanup failure", ex);
} catch (Error error) {
LOG.error("AsyncLocalizer cleanup failure", error);
Utils.exitProcess(20, "AsyncLocalizer cleanup failure");
} finally {
LOG.info("Finish cleanup");
}
}