public CdcStreamScanner build()

in cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/scanner/CdcScannerBuilder.java [254:321]


    public CdcStreamScanner build()
    {
        // block on futures to read all CommitLog mutations and collect CDC updates
        List<PartitionUpdateWrapper> updates =
        futures.values()
               .stream()
               .map(future -> FutureUtils.await(future, throwable -> LOGGER.warn("Failed to read instance with error", throwable)))
               .filter(FutureUtils.FutureResult::isSuccess)
               .map(FutureUtils.FutureResult::value)
               .filter(Objects::nonNull)
               .flatMap(Collection::stream)
               .flatMap(f -> f.updates().stream())
               .collect(Collectors.toList());
        stats.mutationsReadPerBatch(updates.size());

        // begin mutate the start state
        CdcState.Mutator stateMutator = startState.mutate();
        Collection<PartitionUpdateWrapper> filteredUpdates = reportTimeTaken(() -> filterValidUpdates(updates, stateMutator),
                                                                             stats::mutationsFilterTime);

        long now = System.currentTimeMillis();
        filteredUpdates.forEach(update -> stats.changeReceived(update.keyspace(),
                                                               update.table(),
                                                               now - TimeUnit.MICROSECONDS.toMillis(update.maxTimestampMicros()))
        );

        if (stateMutator.isFull(cdcOptions))
        {
            int cdcStateSize = stateMutator.size();
            // we don't want the CDC state to grow indefinately, it indicates something is wrong so fail
            futures.clear();
            LOGGER.error("Watermarker has exceeded max permitted size watermarkerSize={} maxCdcStateSize={}", cdcStateSize, cdcOptions.maxCdcStateSize());
            stats.watermarkerExceededSize(cdcStateSize);
            throw new RuntimeException("Watermark state has exceeded max permitted size: " + cdcStateSize);
        }

        // update CDC state with new marker positions
        // only update marker if we fully read all logs on an instance without errors
        futures.forEach((instance, future) -> {
            if (!future.isCompletedExceptionally())
            {
                future.join()
                      .stream()
                      .map(CommitLogReader.Result::marker)
                      .max(Marker::compareTo)
                      .ifPresent(marker -> stateMutator.advanceMarker(instance, marker));
            }
        });
        futures.clear();

        long timeTakenToReadBatch = System.nanoTime() - startTimeNanos;
        LOGGER.debug("Processed CdcScanner startTimestampMicroseconds={} partitionId={} timeNanos={} updates={}",
                     startTimestampMicroseconds,
                     partitionId,
                     timeTakenToReadBatch,
                     updates.size()
        );
        stats.mutationsBatchReadTime(timeTakenToReadBatch);

        // hand updates over to CdcSortedStreamScanner
        // build new end state, purging expired mutations to prevent the state growing indefinately
        CdcState endState = stateMutator
                            .nextEpoch()
                            .withRange(tokenRange)
                            .purge(stats, startTimestampMicroseconds)
                            .build();
        return buildStreamScanner(filteredUpdates, endState);
    }