in src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java [61:181]
public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata next, boolean fromSnapshot)
{
if (!fromSnapshot &&
next.directory.lastModified().equals(prev.directory.lastModified()) &&
next.tokenMap.lastModified().equals(prev.tokenMap.lastModified()))
return;
Set<InetAddressAndPort> removedAddr = Sets.difference(new HashSet<>(prev.directory.allAddresses()),
new HashSet<>(next.directory.allAddresses()));
Set<NodeId> changed = new HashSet<>();
for (NodeId node : next.directory.peerIds())
{
if (directoryEntryChangedFor(node, prev.directory, next.directory) || !prev.tokenMap.tokens(node).equals(next.tokenMap.tokens(node)))
changed.add(node);
}
for (InetAddressAndPort remove : removedAddr)
{
GossipHelper.evictFromMembership(remove);
PeersTable.removeFromSystemPeersTables(remove);
}
for (NodeId change : changed)
{
// next.myNodeId() can be null during replay (before we have registered)
if (next.myNodeId() != null && next.myNodeId().equals(change))
{
switch (next.directory.peerState(change))
{
case BOOTSTRAPPING:
if (prev.directory.peerState(change) != BOOTSTRAPPING)
{
// legacy log messages for tests
logger.info("JOINING: Starting to bootstrap");
logger.info("JOINING: calculation complete, ready to bootstrap");
}
break;
case BOOT_REPLACING:
case REGISTERED:
break;
case JOINED:
SystemKeyspace.updateTokens(next.directory.endpoint(change), next.tokenMap.tokens(change));
// needed if we miss the REGISTERED above; Does nothing if we are already in epStateMap:
Gossiper.instance.maybeInitializeLocalState(SystemKeyspace.incrementAndGetGeneration());
StreamSupport.stream(ColumnFamilyStore.all().spliterator(), false)
.filter(cfs -> Schema.instance.getUserKeyspaces().names().contains(cfs.keyspace.getName()))
.forEach(cfs -> cfs.indexManager.executePreJoinTasksBlocking(true));
if (prev.directory.peerState(change) == MOVING)
logger.info("Node {} state jump to NORMAL", next.directory.endpoint(change));
break;
}
// Maybe intitialise local epstate whatever the node state because we could be processing after a
// replay and so may have not seen any previous local states, making this the first mutation of gossip
// state for the local node.
Gossiper.instance.maybeInitializeLocalState(SystemKeyspace.incrementAndGetGeneration());
Gossiper.instance.addLocalApplicationState(SCHEMA, StorageService.instance.valueFactory.schema(next.schema.getVersion()));
// if the local node's location has changed, update system.local.
if (!next.directory.location(change).equals(prev.directory.location(change)))
SystemKeyspace.updateLocation(next.directory.location(change));
}
if (next.directory.peerState(change) == REGISTERED)
{
// Re-establish any connections made prior to this node registering
InetAddressAndPort endpoint = next.directory.endpoint(change);
logger.info("Peer with address {} has registered, interrupting any previously established connections", endpoint);
MessagingService.instance().interruptOutbound(endpoint);
}
else if (next.directory.peerState(change) == LEFT)
{
Gossiper.instance.mergeNodeToGossip(change, next, prev.tokenMap.tokens(change));
InetAddressAndPort endpoint = prev.directory.endpoint(change);
if (endpoint != null)
{
PeersTable.updateLegacyPeerTable(change, prev, next);
if (!endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
GossipHelper.removeFromGossip(endpoint);
}
}
else if(next.directory.peerState(change) == MOVING)
{
// legacy log messages for tests
logger.debug("Node {} state MOVING, tokens {}", next.directory.endpoint(change), prev.tokenMap.tokens(change));
Gossiper.instance.mergeNodeToGossip(change, next);
PeersTable.updateLegacyPeerTable(change, prev, next);
}
else if (NodeState.isBootstrap(next.directory.peerState(change)))
{
// For compatibility with clients, ensure we set TOKENS for bootstrapping nodes in gossip.
// As these are not yet added to the token map they must be extracted from the in progress sequence.
Collection<Token> tokens = GossipHelper.getTokensFromOperation(change, next);
Gossiper.instance.mergeNodeToGossip(change, next, tokens);
}
else if (prev.directory.peerState(change) == BOOT_REPLACING)
{
// legacy log message for compatibility (& tests)
MultiStepOperation<?> sequence = prev.inProgressSequences.get(change);
if (sequence != null && sequence.kind() == MultiStepOperation.Kind.REPLACE)
{
BootstrapAndReplace replace = (BootstrapAndReplace) sequence;
InetAddressAndPort replaced = prev.directory.endpoint(replace.startReplace.replaced());
InetAddressAndPort replacement = prev.directory.endpoint(change);
Collection<Token> tokens = GossipHelper.getTokensFromOperation(replace);
logger.info("Node {} will complete replacement of {} for tokens {}", replacement, replaced, tokens);
if (!replacement.equals(replaced))
{
for (Token token : tokens)
logger.warn("Token {} changing ownership from {} to {}", token, replaced, replacement);
}
Gossiper.instance.mergeNodeToGossip(change, next, tokens);
PeersTable.updateLegacyPeerTable(change, prev, next);
}
}
else
{
Gossiper.instance.mergeNodeToGossip(change, next);
PeersTable.updateLegacyPeerTable(change, prev, next);
}
}
}