in src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java [102:252]
public final Result execute(ClusterMetadata prev)
{
Keyspaces newKeyspaces;
try
{
// Applying the schema transformation may produce client warnings. If this is being executed by a follower
// of the cluster metadata log, there is no client or ClientState, so warning collection is a no-op.
// When a DDL statement is received from an actual client, the transformation is checked for validation
// and warnings are captured at that point, before being submitted to the CMS.
// If the coordinator is a CMS member, then this method will be called as part of committing to the metadata
// log. In this case, there is a connected client and associated ClientState, so to avoid duplicate warnings
// pause capture and resume after in applying the schema change.
schemaTransformation.enterExecution();
// Guard against an invalid SchemaTransformation supplying a TableMetadata with a future epoch
newKeyspaces = schemaTransformation.apply(prev);
newKeyspaces.forEach(ksm -> {
ksm.tables.forEach(tm -> {
if (tm.epoch.isAfter(prev.nextEpoch()))
throw new InvalidRequestException(String.format("Invalid schema transformation. " +
"Resultant epoch for table metadata of %s.%s (%d) " +
"is greater than for cluster metadata (%d)",
ksm.name, tm.name, tm.epoch.getEpoch(),
prev.nextEpoch().getEpoch()));
});
});
}
catch (AlreadyExistsException t)
{
return new Rejected(ALREADY_EXISTS, t.getMessage());
}
catch (ConfigurationException t)
{
return new Rejected(CONFIG_ERROR, t.getMessage());
}
catch (InvalidRequestException t)
{
return new Rejected(INVALID, t.getMessage());
}
catch (SyntaxException t)
{
return new Rejected(SYNTAX_ERROR, t.getMessage());
}
catch (Throwable t)
{
JVMStabilityInspector.inspectThrowable(t);
return new Rejected(SERVER_ERROR, t.getMessage());
}
finally
{
schemaTransformation.exitExecution();
}
Keyspaces.KeyspacesDiff diff = Keyspaces.diff(prev.schema.getKeyspaces(), newKeyspaces);
// Used to ensure that any new or modified TableMetadata has the correct epoch
Epoch nextEpoch = prev.nextEpoch();
// Used to determine whether this schema change impacts data placements in any way.
// If so, then reject the change if there are data movement operations inflight, i.e. if any ranges are locked.
// If not, or if no ranges are locked then the change is permitted and placements recalculated as part of this
// transformation.
// Impact on data placements is determined as:
// * Any new keyspace configured with a previously unused set of replication params
// * Any existing keyspace with an altered set of replication params
// * Dropping all keyspaces with a specific set of replication params
Set<KeyspaceMetadata> affectsPlacements = new HashSet<>();
Map<ReplicationParams, Set<KeyspaceMetadata>> keyspacesByReplication = groupByReplication(prev.schema.getKeyspaces());
// Scan dropped keyspaces to check if any existing replication scheme will become unused after this change
Map<ReplicationParams, Set<KeyspaceMetadata>> intendedToDrop = groupByReplication(diff.dropped);
intendedToDrop.forEach((replication, keyspaces) -> {
if (keyspaces.containsAll(keyspacesByReplication.get(replication)))
affectsPlacements.addAll(keyspaces);
});
// Scan new keyspaces to check for any new replication schemes and to ensure that the metadata of new tables
// in those keyspaces has the correct epoch
for (KeyspaceMetadata newKSM : diff.created)
{
if (!keyspacesByReplication.containsKey(newKSM.params.replication))
affectsPlacements.add(newKSM);
Tables tables = Tables.of(normaliseTableEpochs(nextEpoch, newKSM.tables.stream()));
Views views = Views.of(normaliseViewEpochs(nextEpoch, newKSM.views.stream()));
newKeyspaces = newKeyspaces.withAddedOrUpdated(newKSM.withSwapped(tables).withSwapped(views));
}
// Scan modified keyspaces to check for replication changes and to ensure that any modified table metadata
// has the correct epoch
for (KeyspaceMetadata.KeyspaceDiff alteredKSM : diff.altered)
{
if (!alteredKSM.before.params.replication.equals(alteredKSM.after.params.replication))
affectsPlacements.add(alteredKSM.before);
Tables tables = Tables.of(alteredKSM.after.tables);
for (TableMetadata created : normaliseTableEpochs(nextEpoch, alteredKSM.tables.created.stream()))
tables = tables.withSwapped(created);
for (TableMetadata altered : normaliseTableEpochs(nextEpoch, alteredKSM.tables.altered.stream().map(altered -> altered.after)))
tables = tables.withSwapped(altered);
Views views = Views.of(alteredKSM.after.views);
for (ViewMetadata created : normaliseViewEpochs(nextEpoch, alteredKSM.views.created.stream()))
views = views.withSwapped(created);
for (ViewMetadata altered : normaliseViewEpochs(nextEpoch, alteredKSM.views.altered.stream().map(altered -> altered.after)))
views = views.withSwapped(altered);
newKeyspaces = newKeyspaces.withAddedOrUpdated(alteredKSM.after.withSwapped(tables).withSwapped(views));
}
// Changes which affect placement (i.e. new, removed or altered replication settings) are not allowed if there
// are ongoing range movements, including node replacements and partial joins (nodes in write survey mode).
if (!affectsPlacements.isEmpty())
{
logger.debug("Schema change affects data placements, relevant keyspaces: {}", affectsPlacements);
if (!prev.lockedRanges.locked.isEmpty())
return new Rejected(INVALID,
String.format("The requested schema changes cannot be executed as they conflict " +
"with ongoing range movements. The changes for keyspaces %s are blocked " +
"by the locked ranges %s",
affectsPlacements.stream().map(k -> k.name).collect(Collectors.joining(",", "[", "]")),
prev.lockedRanges.locked));
}
DistributedSchema snapshotAfter = new DistributedSchema(newKeyspaces);
ClusterMetadata.Transformer next = prev.transformer().with(snapshotAfter);
if (!affectsPlacements.isEmpty())
{
// state.schema is a DistributedSchema, so doesn't include local keyspaces. If we don't explicitly include those
// here, their placements won't be calculated, effectively dropping them from the new versioned state
Keyspaces allKeyspaces = prev.schema.getKeyspaces().withAddedOrReplaced(snapshotAfter.getKeyspaces());
DataPlacements calculatedPlacements = ClusterMetadataService.instance()
.placementProvider()
.calculatePlacements(prev.nextEpoch(), prev.tokenMap.toRanges(), prev, allKeyspaces);
DataPlacements.Builder newPlacementsBuilder = DataPlacements.builder(calculatedPlacements.size());
calculatedPlacements.forEach((params, newPlacement) -> {
DataPlacement previousPlacement = prev.placements.get(params);
// Preserve placement versioning that has resulted from natural application where possible
if (previousPlacement.equivalentTo(newPlacement))
newPlacementsBuilder.with(params, previousPlacement);
else
newPlacementsBuilder.with(params, newPlacement);
});
next = next.with(newPlacementsBuilder.build());
}
next = maybeUpdateConsensusMigrationState(prev.consensusMigrationState, next, diff.altered, diff.dropped);
return Transformation.success(next, LockedRanges.AffectedRanges.EMPTY);
}