in cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java [260:300]
public StreamScanner getCdcScanner(int partitionId,
@NotNull CqlTable table,
@NotNull Partitioner partitioner,
@NotNull CommitLogProvider commitLogProvider,
@NotNull TableIdLookup tableIdLookup,
@NotNull Stats stats,
@Nullable SparkRangeFilter sparkRangeFilter,
@Nullable CdcOffsetFilter offset,
int minimumReplicasPerMutation,
@NotNull Watermarker watermarker,
@NotNull String jobId,
@NotNull ExecutorService executorService,
@NotNull TimeProvider timeProvider)
{
// NOTE: Need to use SchemaBuilder to init keyspace if not already set in Cassandra schema instance
UUID tableId = tableIdLookup.lookup(table.keyspace(), table.table());
SchemaBuilder schemaBuilder = new SchemaBuilder(table, partitioner, tableId);
if (tableId != null)
{
// Verify TableMetadata and ColumnFamilyStore initialized in Schema
TableId tableIdAfter = TableId.fromUUID(tableId);
Preconditions.checkNotNull(Schema.instance.getTableMetadata(tableIdAfter),
"Table not initialized in the schema");
Preconditions.checkArgument(Objects.requireNonNull(Schema.instance.getKeyspaceInstance(table.keyspace()))
.hasColumnFamilyStore(tableIdAfter),
"ColumnFamilyStore not initialized in the schema");
}
TableMetadata metadata = schemaBuilder.tableMetaData();
return new CdcScannerBuilder(partitionId,
metadata,
partitioner,
commitLogProvider,
stats,
sparkRangeFilter,
offset,
minimumReplicasPerMutation,
watermarker,
jobId,
executorService,
timeProvider).build();
}