public static void updateCdcSchema()

in cassandra-four-zero-types/src/main/java/org/apache/cassandra/bridge/CassandraSchema.java [287:342]


    public static void updateCdcSchema(@NotNull Schema schema,
                                       @NotNull Set<CqlTable> cdcTables,
                                       @NotNull Partitioner partitioner,
                                       @NotNull TableIdLookup tableIdLookup)
    {
        LOGGER.info("Updating CDC schema tables='{}'",
                    cdcTables.stream()
                             .map(t -> String.format("%s.%s", t.keyspace(), t.table()))
                             .collect(Collectors.joining(",")));
        Map<String, Set<String>> cdcEnabledTables = CassandraSchema.cdcEnabledTables(schema);
        for (CqlTable table : cdcTables)
        {
            table.udts().forEach(udt -> CassandraTypesImplementation.INSTANCE.updateUDTs(table.keyspace(), udt));

            UUID tableId = tableIdLookup.lookup(table.keyspace(), table.table());
            if (cdcEnabledTables.containsKey(table.keyspace()) && cdcEnabledTables.get(table.keyspace()).contains(table.table()))
            {
                // table has cdc enabled already, update schema if it has changed
                LOGGER.info("CDC already enabled keyspace={} table={}", table.keyspace(), table.table());
                cdcEnabledTables.get(table.keyspace()).remove(table.table());
                CassandraSchema.maybeUpdateSchema(schema, partitioner, table, tableId, true);
                Preconditions.checkArgument(CassandraSchema.isCdcEnabled(schema, table),
                                            "CDC not enabled for table: " + table.keyspace() + "." + table.table());
                continue;
            }

            if (CassandraSchema.has(schema, table))
            {
                // update schema if changed for existing table
                LOGGER.info("Enabling CDC on existing table keyspace={} table={}", table.keyspace(), table.table());
                CassandraSchema.maybeUpdateSchema(schema, partitioner, table, tableId, true);
                Preconditions.checkArgument(CassandraSchema.isCdcEnabled(schema, table),
                                            "CDC not enabled for table: " + table.keyspace() + "." + table.table());
                continue;
            }

            // new table so initialize table with cdc = true
            LOGGER.info("Adding new CDC enabled table keyspace={} table={}", table.keyspace(), table.table());
            new SchemaBuilder(table, partitioner, tableId, true);
            if (tableId != null)
            {
                // verify TableMetadata and ColumnFamilyStore initialized in Schema
                TableId tableIdAfter = TableId.fromUUID(tableId);
                Preconditions.checkNotNull(schema.getTableMetadata(tableIdAfter), "Table not initialized in the schema");
                Preconditions.checkArgument(Objects.requireNonNull(schema.getKeyspaceInstance(table.keyspace())).hasColumnFamilyStore(tableIdAfter),
                                            "ColumnFamilyStore not initialized in the schema");
                Preconditions.checkArgument(CassandraSchema.isCdcEnabled(schema, table),
                                            "CDC not enabled for table: " + table.keyspace() + "." + table.table());
            }
        }
        // existing table no longer with cdc = true, so disable
        cdcEnabledTables.forEach((ks, tables) -> tables.forEach(table -> {
            LOGGER.warn("Disabling CDC on table keyspace={} table={}", ks, table);
            CassandraSchema.disableCdc(schema, ks, table);
        }));
    }