public final Result execute()

in src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java [102:252]


    public final Result execute(ClusterMetadata prev)
    {
        Keyspaces newKeyspaces;
        try
        {
            // Applying the schema transformation may produce client warnings. If this is being executed by a follower
            // of the cluster metadata log, there is no client or ClientState, so warning collection is a no-op.
            // When a DDL statement is received from an actual client, the transformation is checked for validation
            // and warnings are captured at that point, before being submitted to the CMS.
            // If the coordinator is a CMS member, then this method will be called as part of committing to the metadata
            // log. In this case, there is a connected client and associated ClientState, so to avoid duplicate warnings
            // pause capture and resume after in applying the schema change.
            schemaTransformation.enterExecution();
            // Guard against an invalid SchemaTransformation supplying a TableMetadata with a future epoch
            newKeyspaces = schemaTransformation.apply(prev);
            newKeyspaces.forEach(ksm -> {
               ksm.tables.forEach(tm -> {
                   if (tm.epoch.isAfter(prev.nextEpoch()))
                       throw new InvalidRequestException(String.format("Invalid schema transformation. " +
                                                                       "Resultant epoch for table metadata of %s.%s (%d) " +
                                                                       "is greater than for cluster metadata (%d)",
                                                                       ksm.name, tm.name, tm.epoch.getEpoch(),
                                                                       prev.nextEpoch().getEpoch()));
               });
            });
        }
        catch (AlreadyExistsException t)
        {
            return new Rejected(ALREADY_EXISTS, t.getMessage());
        }
        catch (ConfigurationException t)
        {
            return new Rejected(CONFIG_ERROR, t.getMessage());
        }
        catch (InvalidRequestException t)
        {
            return new Rejected(INVALID, t.getMessage());
        }
        catch (SyntaxException t)
        {
            return new Rejected(SYNTAX_ERROR, t.getMessage());
        }
        catch (Throwable t)
        {
            JVMStabilityInspector.inspectThrowable(t);
            return new Rejected(SERVER_ERROR, t.getMessage());
        }
        finally
        {
            schemaTransformation.exitExecution();
        }

        Keyspaces.KeyspacesDiff diff = Keyspaces.diff(prev.schema.getKeyspaces(), newKeyspaces);

        // Used to ensure that any new or modified TableMetadata has the correct epoch
        Epoch nextEpoch = prev.nextEpoch();

        // Used to determine whether this schema change impacts data placements in any way.
        // If so, then reject the change if there are data movement operations inflight, i.e. if any ranges are locked.
        // If not, or if no ranges are locked then the change is permitted and placements recalculated as part of this
        // transformation.
        // Impact on data placements is determined as:
        // * Any new keyspace configured with a previously unused set of replication params
        // * Any existing keyspace with an altered set of replication params
        // * Dropping all keyspaces with a specific set of replication params
        Set<KeyspaceMetadata> affectsPlacements = new HashSet<>();
        Map<ReplicationParams, Set<KeyspaceMetadata>> keyspacesByReplication = groupByReplication(prev.schema.getKeyspaces());

        // Scan dropped keyspaces to check if any existing replication scheme will become unused after this change
        Map<ReplicationParams, Set<KeyspaceMetadata>> intendedToDrop = groupByReplication(diff.dropped);
        intendedToDrop.forEach((replication, keyspaces) -> {
            if (keyspaces.containsAll(keyspacesByReplication.get(replication)))
                affectsPlacements.addAll(keyspaces);
        });

        // Scan new keyspaces to check for any new replication schemes and to ensure that the metadata of new tables
        // in those keyspaces has the correct epoch
        for (KeyspaceMetadata newKSM : diff.created)
        {
            if (!keyspacesByReplication.containsKey(newKSM.params.replication))
                affectsPlacements.add(newKSM);

            Tables tables = Tables.of(normaliseTableEpochs(nextEpoch, newKSM.tables.stream()));
            Views views = Views.of(normaliseViewEpochs(nextEpoch, newKSM.views.stream()));
            newKeyspaces = newKeyspaces.withAddedOrUpdated(newKSM.withSwapped(tables).withSwapped(views));
        }

        // Scan modified keyspaces to check for replication changes and to ensure that any modified table metadata
        // has the correct epoch
        for (KeyspaceMetadata.KeyspaceDiff alteredKSM : diff.altered)
        {
            if (!alteredKSM.before.params.replication.equals(alteredKSM.after.params.replication))
                affectsPlacements.add(alteredKSM.before);

            Tables tables = Tables.of(alteredKSM.after.tables);
            for (TableMetadata created : normaliseTableEpochs(nextEpoch, alteredKSM.tables.created.stream()))
                tables = tables.withSwapped(created);

            for (TableMetadata altered : normaliseTableEpochs(nextEpoch, alteredKSM.tables.altered.stream().map(altered -> altered.after)))
                tables = tables.withSwapped(altered);

            Views views = Views.of(alteredKSM.after.views);
            for (ViewMetadata created : normaliseViewEpochs(nextEpoch, alteredKSM.views.created.stream()))
                views = views.withSwapped(created);

            for (ViewMetadata altered : normaliseViewEpochs(nextEpoch, alteredKSM.views.altered.stream().map(altered -> altered.after)))
                views = views.withSwapped(altered);

            newKeyspaces = newKeyspaces.withAddedOrUpdated(alteredKSM.after.withSwapped(tables).withSwapped(views));
        }

        // Changes which affect placement (i.e. new, removed or altered replication settings) are not allowed if there
        // are ongoing range movements, including node replacements and partial joins (nodes in write survey mode).
        if (!affectsPlacements.isEmpty())
        {
            logger.debug("Schema change affects data placements, relevant keyspaces: {}", affectsPlacements);
            if (!prev.lockedRanges.locked.isEmpty())
                return new Rejected(INVALID,
                                    String.format("The requested schema changes cannot be executed as they conflict " +
                                                  "with ongoing range movements. The changes for keyspaces %s are blocked " +
                                                  "by the locked ranges %s",
                                                  affectsPlacements.stream().map(k -> k.name).collect(Collectors.joining(",", "[", "]")),
                                                  prev.lockedRanges.locked));

        }

        DistributedSchema snapshotAfter = new DistributedSchema(newKeyspaces);
        ClusterMetadata.Transformer next = prev.transformer().with(snapshotAfter);
        if (!affectsPlacements.isEmpty())
        {
            // state.schema is a DistributedSchema, so doesn't include local keyspaces. If we don't explicitly include those
            // here, their placements won't be calculated, effectively dropping them from the new versioned state
            Keyspaces allKeyspaces = prev.schema.getKeyspaces().withAddedOrReplaced(snapshotAfter.getKeyspaces());
            DataPlacements calculatedPlacements = ClusterMetadataService.instance()
                                                                       .placementProvider()
                                                                       .calculatePlacements(prev.nextEpoch(), prev.tokenMap.toRanges(), prev, allKeyspaces);

            DataPlacements.Builder newPlacementsBuilder = DataPlacements.builder(calculatedPlacements.size());
            calculatedPlacements.forEach((params, newPlacement) -> {
                DataPlacement previousPlacement = prev.placements.get(params);
                // Preserve placement versioning that has resulted from natural application where possible
                if (previousPlacement.equivalentTo(newPlacement))
                    newPlacementsBuilder.with(params, previousPlacement);
                else
                    newPlacementsBuilder.with(params, newPlacement);
            });
            next = next.with(newPlacementsBuilder.build());
        }
        next = maybeUpdateConsensusMigrationState(prev.consensusMigrationState, next, diff.altered, diff.dropped);
        return Transformation.success(next, LockedRanges.AffectedRanges.EMPTY);
    }