in cassandra-four-zero-types/src/main/java/org/apache/cassandra/bridge/CassandraSchema.java [287:342]
public static void updateCdcSchema(@NotNull Schema schema,
@NotNull Set<CqlTable> cdcTables,
@NotNull Partitioner partitioner,
@NotNull TableIdLookup tableIdLookup)
{
LOGGER.info("Updating CDC schema tables='{}'",
cdcTables.stream()
.map(t -> String.format("%s.%s", t.keyspace(), t.table()))
.collect(Collectors.joining(",")));
Map<String, Set<String>> cdcEnabledTables = CassandraSchema.cdcEnabledTables(schema);
for (CqlTable table : cdcTables)
{
table.udts().forEach(udt -> CassandraTypesImplementation.INSTANCE.updateUDTs(table.keyspace(), udt));
UUID tableId = tableIdLookup.lookup(table.keyspace(), table.table());
if (cdcEnabledTables.containsKey(table.keyspace()) && cdcEnabledTables.get(table.keyspace()).contains(table.table()))
{
// table has cdc enabled already, update schema if it has changed
LOGGER.info("CDC already enabled keyspace={} table={}", table.keyspace(), table.table());
cdcEnabledTables.get(table.keyspace()).remove(table.table());
CassandraSchema.maybeUpdateSchema(schema, partitioner, table, tableId, true);
Preconditions.checkArgument(CassandraSchema.isCdcEnabled(schema, table),
"CDC not enabled for table: " + table.keyspace() + "." + table.table());
continue;
}
if (CassandraSchema.has(schema, table))
{
// update schema if changed for existing table
LOGGER.info("Enabling CDC on existing table keyspace={} table={}", table.keyspace(), table.table());
CassandraSchema.maybeUpdateSchema(schema, partitioner, table, tableId, true);
Preconditions.checkArgument(CassandraSchema.isCdcEnabled(schema, table),
"CDC not enabled for table: " + table.keyspace() + "." + table.table());
continue;
}
// new table so initialize table with cdc = true
LOGGER.info("Adding new CDC enabled table keyspace={} table={}", table.keyspace(), table.table());
new SchemaBuilder(table, partitioner, tableId, true);
if (tableId != null)
{
// verify TableMetadata and ColumnFamilyStore initialized in Schema
TableId tableIdAfter = TableId.fromUUID(tableId);
Preconditions.checkNotNull(schema.getTableMetadata(tableIdAfter), "Table not initialized in the schema");
Preconditions.checkArgument(Objects.requireNonNull(schema.getKeyspaceInstance(table.keyspace())).hasColumnFamilyStore(tableIdAfter),
"ColumnFamilyStore not initialized in the schema");
Preconditions.checkArgument(CassandraSchema.isCdcEnabled(schema, table),
"CDC not enabled for table: " + table.keyspace() + "." + table.table());
}
}
// existing table no longer with cdc = true, so disable
cdcEnabledTables.forEach((ks, tables) -> tables.forEach(table -> {
LOGGER.warn("Disabling CDC on table keyspace={} table={}", ks, table);
CassandraSchema.disableCdc(schema, ks, table);
}));
}