in priam/src/main/java/com/netflix/priam/backupv2/BackupTTLTask.java [98:219]
    public void execute() throws Exception {
        if (instanceState.getRestoreStatus() != null
                && instanceState.getRestoreStatus().getStatus() != null
                && instanceState.getRestoreStatus().getStatus() == Status.STARTED) {
            logger.info("Not executing the TTL Task for backups as Priam is in restore mode.");
            return;
        }
        // Do not allow more than one backupTTLService to run at the same time. This is possible
        // as this happens on CRON.
        if (!lock.tryLock()) {
            logger.warn("{} is already running! Try again later.", JOBNAME);
            throw new Exception(JOBNAME + " already running");
        }
        // Sleep a random amount but not so long that it will spill into the next token's turn.
        if (maxWaitMillis > 0) Thread.sleep(new Random().nextInt(maxWaitMillis));
        try {
            filesInMeta.clear();
            filesToDelete.clear();
            Instant dateToTtl =
                    DateUtil.getInstant().minus(config.getBackupRetentionDays(), ChronoUnit.DAYS);
            // Find the snapshot just after this date.
            List<AbstractBackupPath> metas =
                    metaProxy.findMetaFiles(
                            new DateUtil.DateRange(dateToTtl, DateUtil.getInstant()));
            if (metas.size() == 0) {
                logger.info("No meta file found and thus cannot run TTL Service");
                return;
            }
            // Get the first file after the TTL time as we get files which are sorted latest to
            // oldest.
            AbstractBackupPath metaFile = metas.get(metas.size() - 1);
            // Download the meta file to local file system.
            Path localFile = metaProxy.downloadMetaFile(metaFile);
            // Walk over the file system iterator and if not in map, it is eligible for delete.
            new MetaFileWalker().readMeta(localFile);
            logger.info("No. of component files loaded from meta file: {}", filesInMeta.size());
            // Delete the meta file downloaded locally
            FileUtils.deleteQuietly(localFile.toFile());
            // If there are no files listed in meta, do not delete. This could be a bug!!
            if (filesInMeta.isEmpty()) {
                logger.warn("Meta file was empty. This should not happen. Getting out!!");
                return;
            }
            // Delete the  old META files. We are giving start date which is so back in past to get
            // all the META files.
            // This feature did not exist in Jan 2018.
            metas =
                    metaProxy.findMetaFiles(
                            new DateUtil.DateRange(
                                    start_of_feature, dateToTtl.minus(1, ChronoUnit.HOURS)));
            if (metas != null && metas.size() != 0) {
                logger.info(
                        "Will delete(TTL) {} META files starting from: [{}]",
                        metas.size(),
                        metas.get(metas.size() - 1).getLastModified());
                for (AbstractBackupPath meta : metas) {
                    deleteFile(meta, false);
                }
            }
            Iterator<String> remoteFileLocations =
                    fileSystem.listFileSystem(getSSTPrefix(), null, null);
            /*
            We really cannot delete the files until the TTL period.
            Cassandra can flush files on file system like Index.db first and other component files later (like 30 mins). If there is a snapshot in between, then this "single" component file would not be part of the snapshot as SSTable is still not part of Cassandra's "view". Only if Cassandra could provide strong guarantees on the file system such that -
            1. All component will be flushed to disk as real SSTables only if they are part of the view. Until that happens all the files will be "tmp" files.
            2. All component flushed will have the same "last modified" file. i.e. on the first flush. Stats.db can change over time and that is OK.
            Since this is not the case, the TTL may end up deleting this file even though the file is part of the next snapshot. To avoid, this we add grace period (based on how long compaction can run) when we delete the files.
            */
            dateToTtl = dateToTtl.minus(config.getGracePeriodDaysForCompaction(), ChronoUnit.DAYS);
            logger.info(
                    "Will delete(TTL) SST_V2 files which are before this time: {}. Input: [TTL: {} days, Grace Period: {} days]",
                    dateToTtl,
                    config.getBackupRetentionDays(),
                    config.getGracePeriodDaysForCompaction());
            while (remoteFileLocations.hasNext()) {
                AbstractBackupPath abstractBackupPath = abstractBackupPathProvider.get();
                abstractBackupPath.parseRemote(remoteFileLocations.next());
                // If lastModifiedTime is after the dateToTTL, we should get out of this loop as
                // remote file systems always give locations which are sorted.
                if (abstractBackupPath.getLastModified().isAfter(dateToTtl)) {
                    logger.info(
                            "Breaking from TTL. Got a key which is after the TTL time: {}",
                            abstractBackupPath.getRemotePath());
                    break;
                }
                if (!filesInMeta.containsKey(abstractBackupPath.getRemotePath())) {
                    deleteFile(abstractBackupPath, false);
                } else {
                    if (logger.isDebugEnabled())
                        logger.debug(
                                "Not deleting this key as it is referenced in backups: {}",
                                abstractBackupPath.getRemotePath());
                }
            }
            // Delete remaining files.
            deleteFile(null, true);
            logger.info("Finished processing files for TTL service");
        } finally {
            lock.unlock();
        }
    }