in src/java/org/apache/cassandra/gms/Gossiper.java [2145:2248]
private void mergeNodeToGossip(NodeId nodeId, ClusterMetadata metadata, Collection<Token> tokens, boolean forceHibernate)
{
taskLock.lock();
try
{
boolean isLocal = nodeId.equals(metadata.myNodeId());
IPartitioner partitioner = metadata.tokenMap.partitioner();
NodeAddresses addresses = metadata.directory.getNodeAddresses(nodeId);
Location location = metadata.directory.location(nodeId);
InetAddressAndPort endpoint = addresses.broadcastAddress;
VersionedValue.VersionedValueFactory valueFactory = isLocal ? StorageService.instance.valueFactory : new VersionedValue.VersionedValueFactory(partitioner, () -> 0);
Gossiper.runInGossipStageBlocking(() -> {
EndpointState epstate = getEndpointStateForEndpoint(endpoint);
if (epstate == null)
epstate = new EndpointState(HeartBeatState.empty());
Map<ApplicationState, VersionedValue> newStates = new EnumMap<>(ApplicationState.class);
for (ApplicationState appState : ApplicationState.values())
{
VersionedValue oldValue = epstate.getApplicationState(appState);
VersionedValue newValue = null;
switch (appState)
{
case DC:
newValue = valueFactory.datacenter(location.datacenter);
break;
case SCHEMA:
newValue = valueFactory.schema(metadata.schema.getVersion());
break;
case RACK:
newValue = valueFactory.rack(location.rack);
break;
case RELEASE_VERSION:
newValue = valueFactory.releaseVersion(metadata.directory.version(nodeId).cassandraVersion.toString());
break;
case RPC_ADDRESS:
newValue = valueFactory.rpcaddress(endpoint.getAddress());
break;
case HOST_ID:
// If still running in gossip mode, meaning the upgrade to TCM isn't fully complete,
// continue to gossip the old host id value here, not the node id
UUID uuid = ClusterMetadataService.state() == ClusterMetadataService.State.GOSSIP
? metadata.directory.hostId(nodeId)
: nodeId.toUUID();
newValue = valueFactory.hostId(uuid);
break;
case TOKENS:
if (tokens != null && !tokens.isEmpty())
newValue = valueFactory.tokens(tokens);
break;
case INTERNAL_ADDRESS_AND_PORT:
newValue = valueFactory.internalAddressAndPort(addresses.localAddress);
break;
case NATIVE_ADDRESS_AND_PORT:
newValue = valueFactory.nativeaddressAndPort(addresses.nativeAddress);
break;
case STATUS:
// only publish/add STATUS if there are non-upgraded hosts
if (metadata.directory.versions.values().stream().allMatch(NodeVersion::isUpgraded))
break;
case STATUS_WITH_PORT:
// if StorageService.instance.shouldJoinRing() == false, the node was started with
// -Dcassandra.join_ring=false and an operator is yet to manually join via JMX.
// In this case, the app state will be set to `hibernate` by StorageService, so
// don't set it here as nodeStateToStatus only considers persistent states (e.g.
// ones stored in ClusterMetadata), it isn't aware of transient states like hibernate.
// forceHibernate can be true when upgrading from pre-tcm versions - if a node is hibernating
// we have no state for this in cluster metadata, so we need to explicitly keep that from
// the pre-upgrade gossip states
if (forceHibernate)
{
newValue = valueFactory.hibernate(true);
break;
}
if (isLocal && !StorageService.instance.shouldJoinRing())
break;
newValue = GossipHelper.nodeStateToStatus(nodeId, metadata, tokens, valueFactory, oldValue);
break;
default:
newValue = oldValue;
}
if (newValue != null)
{
// note that version needs to be > -1 here, otherwise Gossiper#sendAll on generation change doesn't send it
if (!isLocal)
newValue = unsafeMakeVersionedValue(newValue.value, oldValue == null ? 0 : oldValue.version);
newStates.put(appState, newValue);
}
}
HeartBeatState heartBeatState = new HeartBeatState(epstate.getHeartBeatState().getGeneration(), isLocal ? VersionGenerator.getNextVersion() : 0);
EndpointState newepstate = new EndpointState(heartBeatState, newStates);
unsafeUpdateEpStates(endpoint, newepstate);
logger.debug("Updated epstates for {}: {}", endpoint, newepstate);
});
}
catch (Exception e)
{
logger.warn("Could not merge node {} to gossip", nodeId, e);
}
finally
{
taskLock.unlock();
}
}