protected void cleanUpCdcRawDirectory()

in server/src/main/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleaner.java [192:284]


    protected void cleanUpCdcRawDirectory(File cdcRawDirectory)
    {
        if (!cdcRawDirectory.exists() || !cdcRawDirectory.isDirectory())
        {
            LOGGER.debug("Skipping CdcRawDirectorySpaceCleaner: CDC directory does not exist: {}", cdcRawDirectory);
            return;
        }

        List<CdcRawSegmentFile> segmentFiles = Optional
                                               .ofNullable(
                                               cdcRawDirectory.listFiles(this::validSegmentFilter))
                                               .map(files -> Arrays.stream(files)
                                                                   .map(CdcRawSegmentFile::new)
                                                                   .filter(
                                                                   CdcRawSegmentFile::indexExists)
                                                                   .collect(Collectors.toList())
                                               )
                                               .orElseGet(List::of);
        publishCdcStats(segmentFiles);
        if (segmentFiles.size() < 2)
        {
            LOGGER.debug("Skipping cdc data cleaner routine cleanup: No cdc data or only one single cdc segment is found.");
            return;
        }

        long directorySize = FileUtils.directorySize(cdcRawDirectory);
        long upperLimitBytes =
        (long) (maxUsage() * cdcConfiguration.cdcRawDirectoryMaxPercentUsage());
        // Sort the files by segmentId to delete commit log segments in write order
        // The latest file is the current active segment, but it could be created before the retention duration, e.g. slow data ingress
        Collections.sort(segmentFiles);
        long nowInMillis = timeProvider.currentTimeMillis();

        // track the age of the oldest commit log segment to give indication of the time-window buffer available
        cdcMetrics.oldestSegmentAge.metric.setValue(
        (int) MILLISECONDS.toSeconds(nowInMillis - segmentFiles.get(0).lastModified()));

        if (directorySize > upperLimitBytes)
        {
            if (segmentFiles.get(0).segmentId > segmentFiles.get(1).segmentId)
            {
                LOGGER.error("Cdc segments sorted incorrectly {} before {}",
                             segmentFiles.get(0).segmentId, segmentFiles.get(1).segmentId);
            }

            long criticalMillis = cdcConfiguration.cdcRawDirectoryCriticalBufferWindow().toMillis();
            long lowMillis = cdcConfiguration.cdcRawDirectoryLowBufferWindow().toMillis();

            // we keep the last commit log segment as it may still be actively written to
            int i = 0;
            while (i < segmentFiles.size() - 1 && directorySize > upperLimitBytes)
            {
                CdcRawSegmentFile segment = segmentFiles.get(i);
                long ageMillis = nowInMillis - segment.lastModified();

                if (ageMillis < criticalMillis)
                {
                    LOGGER.error("Insufficient Cdc buffer size to maintain {}-minute window segment={} maxSize={} ageMinutes={}",
                                 MILLISECONDS.toMinutes(criticalMillis), segment, upperLimitBytes,
                                 MILLISECONDS.toMinutes(ageMillis));
                    cdcMetrics.criticalCdcRawSpace.metric.update(1);
                }
                else if (ageMillis < lowMillis)
                {
                    LOGGER.warn("Insufficient Cdc buffer size to maintain {}-minute window segment={} maxSize={} ageMinutes={}",
                                MILLISECONDS.toMinutes(lowMillis), segment, upperLimitBytes,
                                MILLISECONDS.toMinutes(ageMillis));
                    cdcMetrics.lowCdcRawSpace.metric.update(1);
                }
                long length = 0;
                try
                {
                    length = deleteSegment(segment);
                    cdcMetrics.deletedSegment.metric.update(length);
                }
                catch (IOException e)
                {
                    LOGGER.warn("Failed to delete cdc segment", e);
                }
                directorySize -= length;
                i++;
            }
        }

        try
        {
            cleanupOrphanedIdxFiles(cdcRawDirectory);
        }
        catch (IOException e)
        {
            LOGGER.warn("Failed to clean up orphaned idx files", e);
        }
    }