in cassandra-four-zero-types/src/main/java/org/apache/cassandra/bridge/CassandraSchema.java [368:410]
public static void updateCdc(Schema schema,
String keyspace,
String table,
boolean enableCdc)
{
if (!has(schema, keyspace, table))
{
throw new IllegalArgumentException("Keyspace/table not initialized: " + keyspace + "/" + table);
}
Optional<TableMetadata> tb = getTable(schema, keyspace, table);
if (!tb.isPresent())
{
throw notExistThrowable(keyspace, table);
}
if (tb.get().params.cdc == enableCdc)
{
// nothing to update
return;
}
update(s -> {
Optional<KeyspaceMetadata> ks = getKeyspaceMetadata(s, keyspace);
Optional<TableMetadata> tableOpt = getTable(s, keyspace, table);
if (!ks.isPresent() || !tableOpt.isPresent())
{
throw notExistThrowable(keyspace, table);
}
if (tableOpt.get().params.cdc == enableCdc)
{
// nothing to update
return;
}
TableMetadata updatedTable = tableOpt.get().unbuild()
.params(tableOpt.get().params.unbuild().cdc(enableCdc).build())
.build();
LOGGER.info("{} CDC for table keyspace={} table={}",
updatedTable.params.cdc ? "Enabling" : "Disabling", keyspace, table);
s.load(ks.get().withSwapped(ks.get().tables.withSwapped(updatedTable)));
});
}