public SequenceState executeNext()

in src/java/org/apache/cassandra/tcm/sequences/Move.java [195:308]


    public SequenceState executeNext()
    {
        switch (next)
        {
            case START_MOVE:
                try
                {
                    ClusterMetadata metadata = ClusterMetadata.current();
                    logger.info("Moving {} from {} to {}.",
                                metadata.directory.endpoint(startMove.nodeId()),
                                metadata.tokenMap.tokens(startMove.nodeId()),
                                finishMove.newTokens);
                    ClusterMetadataService.instance().commit(startMove);
                }
                catch (Throwable t)
                {
                    JVMStabilityInspector.inspectThrowable(t);
                    return continuable();
                }
                break;
            case MID_MOVE:
                try
                {
                    ClusterMetadata metadata = ClusterMetadata.current();
                    logger.info("fetching new ranges and streaming old ranges");
                    StreamPlan streamPlan = new StreamPlan(StreamOperation.RELOCATION);
                    Keyspaces keyspaces = Schema.instance.getNonLocalStrategyKeyspaces();
                    Map<ReplicationParams, EndpointsByReplica> movementMap = movementMap(FailureDetector.instance,
                                                                                         metadata.placements,
                                                                                         toSplitRanges,
                                                                                         startMove.delta(),
                                                                                         midMove.delta(),
                                                                                         StorageService.useStrictConsistency)
                                                                             .asMap();

                    for (KeyspaceMetadata ks : keyspaces)
                    {
                        ReplicationParams replicationParams = ks.params.replication;
                        if (replicationParams.isMeta() || !StreamPlan.hasNonAccordTables(ks))
                            continue;

                        EndpointsByReplica endpoints = movementMap.get(replicationParams);

                        String[] cfNames = StreamPlan.nonAccordTablesForKeyspace(ks);
                        for (Map.Entry<Replica, Replica> e : endpoints.flattenEntries())
                        {
                            Replica destination = e.getKey();
                            Replica source = e.getValue();
                            logger.info("Stream source: {} destination: {}", source, destination);
                            assert !source.endpoint().equals(destination.endpoint()) : String.format("Source %s should not be the same as destionation %s", source, destination);
                            if (source.isSelf())
                                streamPlan.transferRanges(destination.endpoint(), ks.name, RangesAtEndpoint.of(destination), cfNames);
                            else if (destination.isSelf())
                            {
                                if (destination.isFull())
                                    streamPlan.requestRanges(source.endpoint(), ks.name, RangesAtEndpoint.of(destination), RangesAtEndpoint.empty(destination.endpoint()), cfNames);
                                else
                                    streamPlan.requestRanges(source.endpoint(), ks.name, RangesAtEndpoint.empty(destination.endpoint()), RangesAtEndpoint.of(destination), cfNames);
                            }
                            else
                                throw new IllegalStateException("Node should be either source or destination in the movement map " + endpoints);
                        }
                    }

                    StreamResultFuture streamResult = streamPlan.execute();
                    Future<Void> accordReady = AccordService.instance().epochReady(metadata.epoch);
                    FutureCombiner.allOf(streamResult, accordReady).get();
                    StorageService.instance.repairPaxosForTopologyChange("move");
                }
                catch (InterruptedException e)
                {
                    return continuable();
                }
                catch (ExecutionException e)
                {
                    StorageService.instance.markMoveFailed();
                    throw new RuntimeException("Unable to move", e);
                }
                catch (Exception e)
                {
                    StorageService.instance.markMoveFailed();
                    throw e;
                }

                try
                {
                    ClusterMetadataService.instance().commit(midMove);
                }
                catch (Throwable t)
                {
                    JVMStabilityInspector.inspectThrowable(t);
                    return continuable();
                }
                break;
            case FINISH_MOVE:
                ClusterMetadata metadata;
                try
                {
                    SystemKeyspace.updateLocalTokens(tokens);
                    metadata = ClusterMetadataService.instance().commit(finishMove);
                }
                catch (Throwable t)
                {
                    JVMStabilityInspector.inspectThrowable(t);
                    return continuable();
                }
                ClusterMetadataService.instance().ensureCMSPlacement(metadata);
                break;
            default:
                return error(new IllegalStateException("Can't proceed with join from " + next));
        }

        return continuable();
    }