in artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java [284:517]
public void handlePacket(final Packet packet) {
if (packet.getType() == PacketImpl.PING) {
Ping ping = (Ping) packet;
if (config.getConnectionTTLOverride() == -1) {
// Allow clients to specify connection ttl
entry.ttl = ping.getConnectionTTL();
}
// Just send a ping back
channel0.send(packet);
} else if (packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY
|| packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY_V2) {
SubscribeClusterTopologyUpdatesMessage msg = (SubscribeClusterTopologyUpdatesMessage) packet;
if (packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY_V2) {
channel0.getConnection().setChannelVersion(
((SubscribeClusterTopologyUpdatesMessageV2) msg).getClientVersion());
}
final ClusterTopologyListener listener = new ClusterTopologyListener() {
@Override
public void nodeUP(final TopologyMember topologyMember, final boolean last) {
try {
final Pair<TransportConfiguration, TransportConfiguration> connectorPair = BackwardsCompatibilityUtils
.checkTCPPairConversion(
channel0.getConnection().getChannelVersion(), topologyMember);
final String nodeID = topologyMember.getNodeId();
// Using an executor as most of the notifications on the Topology
// may come from a channel itself
// What could cause deadlocks
entry.connectionExecutor.execute(new Runnable() {
@Override
public void run() {
if (channel0.supports(PacketImpl.CLUSTER_TOPOLOGY_V4)) {
channel0.send(new ClusterTopologyChangeMessage_V4(
topologyMember.getUniqueEventID(), nodeID,
topologyMember.getBackupGroupName(),
topologyMember.getScaleDownGroupName(), connectorPair,
last, server.getVersion().getIncrementingVersion()));
} else if (channel0.supports(PacketImpl.CLUSTER_TOPOLOGY_V3)) {
channel0.send(new ClusterTopologyChangeMessage_V3(
topologyMember.getUniqueEventID(), nodeID,
topologyMember.getBackupGroupName(),
topologyMember.getScaleDownGroupName(), connectorPair,
last));
} else if (channel0.supports(PacketImpl.CLUSTER_TOPOLOGY_V2)) {
channel0.send(new ClusterTopologyChangeMessage_V2(
topologyMember.getUniqueEventID(), nodeID,
topologyMember.getBackupGroupName(), connectorPair,
last));
} else {
channel0.send(
new ClusterTopologyChangeMessage(nodeID, connectorPair,
last));
}
}
});
} catch (RejectedExecutionException ignored) {
logger.debug(ignored.getMessage(), ignored);
// this could happen during a shutdown and we don't care, if we lost a nodeDown during a shutdown
// what can we do anyways?
}
}
@Override
public void nodeDown(final long uniqueEventID, final String nodeID) {
// Using an executor as most of the notifications on the Topology
// may come from a channel itself
// What could cause deadlocks
try {
entry.connectionExecutor.execute(new Runnable() {
@Override
public void run() {
if (channel0.supports(PacketImpl.CLUSTER_TOPOLOGY_V2)) {
channel0.send(
new ClusterTopologyChangeMessage_V2(uniqueEventID,
nodeID));
} else {
channel0.send(new ClusterTopologyChangeMessage(nodeID));
}
}
});
} catch (RejectedExecutionException ignored) {
// this could happen during a shutdown and we don't care, if we lost a nodeDown during a shutdown
// what can we do anyways?
}
}
@Override
public String toString() {
return "Remote Proxy on channel " + Integer
.toHexString(System.identityHashCode(this));
}
};
if (acceptorUsed.getClusterConnection() != null) {
acceptorUsed.getClusterConnection().addClusterTopologyListener(listener);
rc.addCloseListener(new CloseListener() {
@Override
public void connectionClosed() {
acceptorUsed.getClusterConnection()
.removeClusterTopologyListener(listener);
}
});
} else {
// if not clustered, we send a single notification to the client containing the node-id where the server is connected to
// This is done this way so Recovery discovery could also use the node-id for non-clustered setups
entry.connectionExecutor.execute(new Runnable() {
@Override
public void run() {
String nodeId = server.getNodeID().toString();
Pair<TransportConfiguration, TransportConfiguration> emptyConfig = new Pair<>(
null, null);
if (channel0.supports(PacketImpl.CLUSTER_TOPOLOGY_V4)) {
channel0.send(new ClusterTopologyChangeMessage_V4(System.currentTimeMillis(), nodeId,
null, null, emptyConfig, true, server.getVersion().getIncrementingVersion()));
} else if (channel0.supports(PacketImpl.CLUSTER_TOPOLOGY_V2)) {
channel0.send(
new ClusterTopologyChangeMessage_V2(System.currentTimeMillis(),
nodeId, null, emptyConfig, true));
} else {
channel0.send(
new ClusterTopologyChangeMessage(nodeId, emptyConfig, true));
}
}
});
}
} else if (packet.getType() == PacketImpl.FEDERATION_DOWNSTREAM_CONNECT) {
//If we receive this packet then a remote broker is requesting us to create federated upstream connection
//back to it which simulates a downstream connection
final FederationDownstreamConnectMessage message = (FederationDownstreamConnectMessage) packet;
final FederationDownstreamConfiguration downstreamConfiguration = message.getStreamConfiguration();
//Create a new Upstream Federation configuration based on the received Downstream connection message
//from the remote broker
//The idea here is to set all the same configuration parameters that apply to the upstream connection
final FederationConfiguration config = new FederationConfiguration();
config.setName(message.getName() + FederationDownstreamConnectMessage.UPSTREAM_SUFFIX);
config.setCredentials(message.getCredentials());
//Add the policy map configuration
for (FederationPolicy policy : message.getFederationPolicyMap().values()) {
config.addFederationPolicy(policy);
}
//Add any transformer configurations
for (FederationTransformerConfiguration transformerConfiguration : message.getTransformerConfigurationMap().values()) {
config.addTransformerConfiguration(transformerConfiguration);
}
//Create an upstream configuration with the same name but apply the upstream suffix so it is unique
final FederationUpstreamConfiguration upstreamConfiguration = new FederationUpstreamConfiguration()
.setName(downstreamConfiguration.getName() + FederationDownstreamConnectMessage.UPSTREAM_SUFFIX)
.addPolicyRefs(downstreamConfiguration.getPolicyRefs());
//Use the provided Transport Configuration information to create an upstream connection back to the broker that
//created the downstream connection
final TransportConfiguration upstreamConfig = downstreamConfiguration.getUpstreamConfiguration();
//Initialize the upstream transport with the config from the acceptor as this will apply
//relevant settings such as SSL, then override with settings from the downstream config
final Map<String, Object> params = new HashMap<>(acceptorUsed.getConfiguration());
if (upstreamConfig.getParams() != null) {
params.putAll(upstreamConfig.getParams());
}
final Map<String, Object> extraParams = new HashMap<>();
if (upstreamConfig.getExtraParams() != null) {
extraParams.putAll(upstreamConfig.getExtraParams());
}
//Add the new upstream configuration that was created so we can connect back to the downstream server
final TransportConfiguration upstreamConf = new TransportConfiguration(
upstreamConfig.getFactoryClassName(), params, upstreamConfig.getName() + FederationDownstreamConnectMessage.UPSTREAM_SUFFIX,
extraParams);
server.getConfiguration()
.addConnectorConfiguration(upstreamConf.getName() + FederationDownstreamConnectMessage.UPSTREAM_SUFFIX, upstreamConf);
//Create a new upstream connection config based on the downstream configuration
FederationConnectionConfiguration downstreamConConf = downstreamConfiguration.getConnectionConfiguration();
FederationConnectionConfiguration upstreamConConf = upstreamConfiguration.getConnectionConfiguration();
List<String> connectorNames = new ArrayList<>();
connectorNames.add(upstreamConf.getName() + FederationDownstreamConnectMessage.UPSTREAM_SUFFIX);
//Configure all of the upstream connection parameters from the downstream connection that are relevant
//Note that HA and discoveryGroupName are skipped because the downstream connection will manage that
//In this case we just want to create a connection back to the broker that sent the downstream packet.
//If this broker goes down then the original broker (if configured with HA) will re-establish a new
//connection to another broker which will then create another upstream, etc
upstreamConConf.setStaticConnectors(connectorNames);
upstreamConConf.setUsername(downstreamConConf.getUsername());
upstreamConConf.setPassword(downstreamConConf.getPassword());
upstreamConConf.setShareConnection(downstreamConConf.isShareConnection());
upstreamConConf.setPriorityAdjustment(downstreamConConf.getPriorityAdjustment());
upstreamConConf.setClientFailureCheckPeriod(downstreamConConf.getClientFailureCheckPeriod());
upstreamConConf.setConnectionTTL(downstreamConConf.getConnectionTTL());
upstreamConConf.setRetryInterval(downstreamConConf.getRetryInterval());
upstreamConConf.setRetryIntervalMultiplier(downstreamConConf.getRetryIntervalMultiplier());
upstreamConConf.setMaxRetryInterval(downstreamConConf.getMaxRetryInterval());
upstreamConConf.setInitialConnectAttempts(downstreamConConf.getInitialConnectAttempts());
upstreamConConf.setReconnectAttempts(downstreamConConf.getReconnectAttempts());
upstreamConConf.setCallTimeout(downstreamConConf.getCallTimeout());
upstreamConConf.setCallFailoverTimeout(downstreamConConf.getCallFailoverTimeout());
config.addUpstreamConfiguration(upstreamConfiguration);
//Register close and failure listeners, if the initial downstream connection goes down then we
//want to terminate the upstream connection
rc.addCloseListener(() -> {
server.getFederationManager().undeploy(config.getName());
});
rc.addFailureListener(new FailureListener() {
@Override
public void connectionFailed(ActiveMQException exception, boolean failedOver) {
server.getFederationManager().undeploy(config.getName());
}
@Override
public void connectionFailed(ActiveMQException exception, boolean failedOver,
String scaleDownTargetNodeID) {
server.getFederationManager().undeploy(config.getName());
}
});
try {
server.getFederationManager().deploy(config);
} catch (Exception e) {
logger.error("Error deploying federation", e);
}
}
}