in cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarStatePersister.java [166:219]
protected void persistToCassandra(boolean force)
{
// clean-up finished futures
activeFlush.removeIf(wrapper -> {
if (wrapper.allDone())
{
try
{
wrapper.await();
sidecarCdcStats.capturePersistSucceeded(System.nanoTime() - wrapper.startTimeNanos);
}
catch (InterruptedException e)
{
LOGGER.warn("Persist failed with InterruptedException", e);
Thread.currentThread().interrupt();
sidecarCdcStats.capturePersistFailed(e);
}
catch (Throwable throwable)
{
LOGGER.warn("Persist failed", throwable);
sidecarCdcStats.capturePersistFailed(throwable);
}
return true;
}
return false;
});
if (!force && !activeFlush.isEmpty())
{
// check for active requests so we don't get backed up
LOGGER.debug("CDC persist flush backed up, can't persist until active requests complete activeRequests={}", activeFlush.size());
sidecarCdcStats.capturePersistBackedUp(activeFlush.size());
return;
}
// drain the latestState map, so we don't persist multiple times wastefully
List<PersistWrapper> states = this.latestState
.keySet()
.stream()
.map(this.latestState::remove)
.filter(Objects::nonNull)
.collect(Collectors.toList());
if (states.isEmpty())
{
// nothing to persist
return;
}
states.stream()
.map(this::persistToCassandra)
.filter(Objects::nonNull)
.forEach(activeFlush::add);
}