protected void persistToCassandra()

in cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarStatePersister.java [166:219]


    protected void persistToCassandra(boolean force)
    {
        // clean-up finished futures
        activeFlush.removeIf(wrapper -> {
            if (wrapper.allDone())
            {
                try
                {
                    wrapper.await();
                    sidecarCdcStats.capturePersistSucceeded(System.nanoTime() - wrapper.startTimeNanos);
                }
                catch (InterruptedException e)
                {
                    LOGGER.warn("Persist failed with InterruptedException", e);
                    Thread.currentThread().interrupt();
                    sidecarCdcStats.capturePersistFailed(e);
                }
                catch (Throwable throwable)
                {
                    LOGGER.warn("Persist failed", throwable);
                    sidecarCdcStats.capturePersistFailed(throwable);
                }
                return true;
            }
            return false;
        });

        if (!force && !activeFlush.isEmpty())
        {
            // check for active requests so we don't get backed up
            LOGGER.debug("CDC persist flush backed up, can't persist until active requests complete activeRequests={}", activeFlush.size());
            sidecarCdcStats.capturePersistBackedUp(activeFlush.size());
            return;
        }

        // drain the latestState map, so we don't persist multiple times wastefully
        List<PersistWrapper> states = this.latestState
                                      .keySet()
                                      .stream()
                                      .map(this.latestState::remove)
                                      .filter(Objects::nonNull)
                                      .collect(Collectors.toList());

        if (states.isEmpty())
        {
            // nothing to persist
            return;
        }

        states.stream()
              .map(this::persistToCassandra)
              .filter(Objects::nonNull)
              .forEach(activeFlush::add);
    }