in src/java/org/apache/cassandra/tcm/sequences/Move.java [195:308]
public SequenceState executeNext()
{
switch (next)
{
case START_MOVE:
try
{
ClusterMetadata metadata = ClusterMetadata.current();
logger.info("Moving {} from {} to {}.",
metadata.directory.endpoint(startMove.nodeId()),
metadata.tokenMap.tokens(startMove.nodeId()),
finishMove.newTokens);
ClusterMetadataService.instance().commit(startMove);
}
catch (Throwable t)
{
JVMStabilityInspector.inspectThrowable(t);
return continuable();
}
break;
case MID_MOVE:
try
{
ClusterMetadata metadata = ClusterMetadata.current();
logger.info("fetching new ranges and streaming old ranges");
StreamPlan streamPlan = new StreamPlan(StreamOperation.RELOCATION);
Keyspaces keyspaces = Schema.instance.getNonLocalStrategyKeyspaces();
Map<ReplicationParams, EndpointsByReplica> movementMap = movementMap(FailureDetector.instance,
metadata.placements,
toSplitRanges,
startMove.delta(),
midMove.delta(),
StorageService.useStrictConsistency)
.asMap();
for (KeyspaceMetadata ks : keyspaces)
{
ReplicationParams replicationParams = ks.params.replication;
if (replicationParams.isMeta() || !StreamPlan.hasNonAccordTables(ks))
continue;
EndpointsByReplica endpoints = movementMap.get(replicationParams);
String[] cfNames = StreamPlan.nonAccordTablesForKeyspace(ks);
for (Map.Entry<Replica, Replica> e : endpoints.flattenEntries())
{
Replica destination = e.getKey();
Replica source = e.getValue();
logger.info("Stream source: {} destination: {}", source, destination);
assert !source.endpoint().equals(destination.endpoint()) : String.format("Source %s should not be the same as destionation %s", source, destination);
if (source.isSelf())
streamPlan.transferRanges(destination.endpoint(), ks.name, RangesAtEndpoint.of(destination), cfNames);
else if (destination.isSelf())
{
if (destination.isFull())
streamPlan.requestRanges(source.endpoint(), ks.name, RangesAtEndpoint.of(destination), RangesAtEndpoint.empty(destination.endpoint()), cfNames);
else
streamPlan.requestRanges(source.endpoint(), ks.name, RangesAtEndpoint.empty(destination.endpoint()), RangesAtEndpoint.of(destination), cfNames);
}
else
throw new IllegalStateException("Node should be either source or destination in the movement map " + endpoints);
}
}
StreamResultFuture streamResult = streamPlan.execute();
Future<Void> accordReady = AccordService.instance().epochReady(metadata.epoch);
FutureCombiner.allOf(streamResult, accordReady).get();
StorageService.instance.repairPaxosForTopologyChange("move");
}
catch (InterruptedException e)
{
return continuable();
}
catch (ExecutionException e)
{
StorageService.instance.markMoveFailed();
throw new RuntimeException("Unable to move", e);
}
catch (Exception e)
{
StorageService.instance.markMoveFailed();
throw e;
}
try
{
ClusterMetadataService.instance().commit(midMove);
}
catch (Throwable t)
{
JVMStabilityInspector.inspectThrowable(t);
return continuable();
}
break;
case FINISH_MOVE:
ClusterMetadata metadata;
try
{
SystemKeyspace.updateLocalTokens(tokens);
metadata = ClusterMetadataService.instance().commit(finishMove);
}
catch (Throwable t)
{
JVMStabilityInspector.inspectThrowable(t);
return continuable();
}
ClusterMetadataService.instance().ensureCMSPlacement(metadata);
break;
default:
return error(new IllegalStateException("Can't proceed with join from " + next));
}
return continuable();
}