in server/src/main/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleaner.java [192:284]
protected void cleanUpCdcRawDirectory(File cdcRawDirectory)
{
if (!cdcRawDirectory.exists() || !cdcRawDirectory.isDirectory())
{
LOGGER.debug("Skipping CdcRawDirectorySpaceCleaner: CDC directory does not exist: {}", cdcRawDirectory);
return;
}
List<CdcRawSegmentFile> segmentFiles = Optional
.ofNullable(
cdcRawDirectory.listFiles(this::validSegmentFilter))
.map(files -> Arrays.stream(files)
.map(CdcRawSegmentFile::new)
.filter(
CdcRawSegmentFile::indexExists)
.collect(Collectors.toList())
)
.orElseGet(List::of);
publishCdcStats(segmentFiles);
if (segmentFiles.size() < 2)
{
LOGGER.debug("Skipping cdc data cleaner routine cleanup: No cdc data or only one single cdc segment is found.");
return;
}
long directorySize = FileUtils.directorySize(cdcRawDirectory);
long upperLimitBytes =
(long) (maxUsage() * cdcConfiguration.cdcRawDirectoryMaxPercentUsage());
// Sort the files by segmentId to delete commit log segments in write order
// The latest file is the current active segment, but it could be created before the retention duration, e.g. slow data ingress
Collections.sort(segmentFiles);
long nowInMillis = timeProvider.currentTimeMillis();
// track the age of the oldest commit log segment to give indication of the time-window buffer available
cdcMetrics.oldestSegmentAge.metric.setValue(
(int) MILLISECONDS.toSeconds(nowInMillis - segmentFiles.get(0).lastModified()));
if (directorySize > upperLimitBytes)
{
if (segmentFiles.get(0).segmentId > segmentFiles.get(1).segmentId)
{
LOGGER.error("Cdc segments sorted incorrectly {} before {}",
segmentFiles.get(0).segmentId, segmentFiles.get(1).segmentId);
}
long criticalMillis = cdcConfiguration.cdcRawDirectoryCriticalBufferWindow().toMillis();
long lowMillis = cdcConfiguration.cdcRawDirectoryLowBufferWindow().toMillis();
// we keep the last commit log segment as it may still be actively written to
int i = 0;
while (i < segmentFiles.size() - 1 && directorySize > upperLimitBytes)
{
CdcRawSegmentFile segment = segmentFiles.get(i);
long ageMillis = nowInMillis - segment.lastModified();
if (ageMillis < criticalMillis)
{
LOGGER.error("Insufficient Cdc buffer size to maintain {}-minute window segment={} maxSize={} ageMinutes={}",
MILLISECONDS.toMinutes(criticalMillis), segment, upperLimitBytes,
MILLISECONDS.toMinutes(ageMillis));
cdcMetrics.criticalCdcRawSpace.metric.update(1);
}
else if (ageMillis < lowMillis)
{
LOGGER.warn("Insufficient Cdc buffer size to maintain {}-minute window segment={} maxSize={} ageMinutes={}",
MILLISECONDS.toMinutes(lowMillis), segment, upperLimitBytes,
MILLISECONDS.toMinutes(ageMillis));
cdcMetrics.lowCdcRawSpace.metric.update(1);
}
long length = 0;
try
{
length = deleteSegment(segment);
cdcMetrics.deletedSegment.metric.update(length);
}
catch (IOException e)
{
LOGGER.warn("Failed to delete cdc segment", e);
}
directorySize -= length;
i++;
}
}
try
{
cleanupOrphanedIdxFiles(cdcRawDirectory);
}
catch (IOException e)
{
LOGGER.warn("Failed to clean up orphaned idx files", e);
}
}