public StreamScanner getCdcScanner()

in cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java [260:300]


    public StreamScanner getCdcScanner(int partitionId,
                                       @NotNull CqlTable table,
                                       @NotNull Partitioner partitioner,
                                       @NotNull CommitLogProvider commitLogProvider,
                                       @NotNull TableIdLookup tableIdLookup,
                                       @NotNull Stats stats,
                                       @Nullable SparkRangeFilter sparkRangeFilter,
                                       @Nullable CdcOffsetFilter offset,
                                       int minimumReplicasPerMutation,
                                       @NotNull Watermarker watermarker,
                                       @NotNull String jobId,
                                       @NotNull ExecutorService executorService,
                                       @NotNull TimeProvider timeProvider)
    {
        // NOTE: Need to use SchemaBuilder to init keyspace if not already set in Cassandra schema instance
        UUID tableId = tableIdLookup.lookup(table.keyspace(), table.table());
        SchemaBuilder schemaBuilder = new SchemaBuilder(table, partitioner, tableId);
        if (tableId != null)
        {
            // Verify TableMetadata and ColumnFamilyStore initialized in Schema
            TableId tableIdAfter = TableId.fromUUID(tableId);
            Preconditions.checkNotNull(Schema.instance.getTableMetadata(tableIdAfter),
                                       "Table not initialized in the schema");
            Preconditions.checkArgument(Objects.requireNonNull(Schema.instance.getKeyspaceInstance(table.keyspace()))
                                               .hasColumnFamilyStore(tableIdAfter),
                                        "ColumnFamilyStore not initialized in the schema");
        }
        TableMetadata metadata = schemaBuilder.tableMetaData();
        return new CdcScannerBuilder(partitionId,
                                     metadata,
                                     partitioner,
                                     commitLogProvider,
                                     stats,
                                     sparkRangeFilter,
                                     offset,
                                     minimumReplicasPerMutation,
                                     watermarker,
                                     jobId,
                                     executorService,
                                     timeProvider).build();
    }