public void handlePacket()

in artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java [284:517]


      public void handlePacket(final Packet packet) {
         if (packet.getType() == PacketImpl.PING) {
            Ping ping = (Ping) packet;

            if (config.getConnectionTTLOverride() == -1) {
               // Allow clients to specify connection ttl
               entry.ttl = ping.getConnectionTTL();
            }

            // Just send a ping back
            channel0.send(packet);
         } else if (packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY
            || packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY_V2) {
            SubscribeClusterTopologyUpdatesMessage msg = (SubscribeClusterTopologyUpdatesMessage) packet;

            if (packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY_V2) {
               channel0.getConnection().setChannelVersion(
                  ((SubscribeClusterTopologyUpdatesMessageV2) msg).getClientVersion());
            }

            final ClusterTopologyListener listener = new ClusterTopologyListener() {
               @Override
               public void nodeUP(final TopologyMember topologyMember, final boolean last) {
                  try {
                     final Pair<TransportConfiguration, TransportConfiguration> connectorPair = BackwardsCompatibilityUtils
                        .checkTCPPairConversion(
                           channel0.getConnection().getChannelVersion(), topologyMember);

                     final String nodeID = topologyMember.getNodeId();
                     // Using an executor as most of the notifications on the Topology
                     // may come from a channel itself
                     // What could cause deadlocks
                     entry.connectionExecutor.execute(new Runnable() {
                        @Override
                        public void run() {
                           if (channel0.supports(PacketImpl.CLUSTER_TOPOLOGY_V4)) {
                              channel0.send(new ClusterTopologyChangeMessage_V4(
                                 topologyMember.getUniqueEventID(), nodeID,
                                 topologyMember.getBackupGroupName(),
                                 topologyMember.getScaleDownGroupName(), connectorPair,
                                 last, server.getVersion().getIncrementingVersion()));
                           } else if (channel0.supports(PacketImpl.CLUSTER_TOPOLOGY_V3)) {
                              channel0.send(new ClusterTopologyChangeMessage_V3(
                                 topologyMember.getUniqueEventID(), nodeID,
                                 topologyMember.getBackupGroupName(),
                                 topologyMember.getScaleDownGroupName(), connectorPair,
                                 last));
                           } else if (channel0.supports(PacketImpl.CLUSTER_TOPOLOGY_V2)) {
                              channel0.send(new ClusterTopologyChangeMessage_V2(
                                 topologyMember.getUniqueEventID(), nodeID,
                                 topologyMember.getBackupGroupName(), connectorPair,
                                 last));
                           } else {
                              channel0.send(
                                 new ClusterTopologyChangeMessage(nodeID, connectorPair,
                                                                  last));
                           }
                        }
                     });
                  } catch (RejectedExecutionException ignored) {
                     logger.debug(ignored.getMessage(), ignored);
                     // this could happen during a shutdown and we don't care, if we lost a nodeDown during a shutdown
                     // what can we do anyways?
                  }

               }

               @Override
               public void nodeDown(final long uniqueEventID, final String nodeID) {
                  // Using an executor as most of the notifications on the Topology
                  // may come from a channel itself
                  // What could cause deadlocks
                  try {
                     entry.connectionExecutor.execute(new Runnable() {
                        @Override
                        public void run() {
                           if (channel0.supports(PacketImpl.CLUSTER_TOPOLOGY_V2)) {
                              channel0.send(
                                 new ClusterTopologyChangeMessage_V2(uniqueEventID,
                                                                     nodeID));
                           } else {
                              channel0.send(new ClusterTopologyChangeMessage(nodeID));
                           }
                        }
                     });
                  } catch (RejectedExecutionException ignored) {
                     // this could happen during a shutdown and we don't care, if we lost a nodeDown during a shutdown
                     // what can we do anyways?
                  }
               }

               @Override
               public String toString() {
                  return "Remote Proxy on channel " + Integer
                     .toHexString(System.identityHashCode(this));
               }
            };

            if (acceptorUsed.getClusterConnection() != null) {
               acceptorUsed.getClusterConnection().addClusterTopologyListener(listener);

               rc.addCloseListener(new CloseListener() {
                  @Override
                  public void connectionClosed() {
                     acceptorUsed.getClusterConnection()
                        .removeClusterTopologyListener(listener);
                  }
               });
            } else {
               // if not clustered, we send a single notification to the client containing the node-id where the server is connected to
               // This is done this way so Recovery discovery could also use the node-id for non-clustered setups
               entry.connectionExecutor.execute(new Runnable() {
                  @Override
                  public void run() {
                     String nodeId = server.getNodeID().toString();
                     Pair<TransportConfiguration, TransportConfiguration> emptyConfig = new Pair<>(
                        null, null);
                     if (channel0.supports(PacketImpl.CLUSTER_TOPOLOGY_V4)) {
                        channel0.send(new ClusterTopologyChangeMessage_V4(System.currentTimeMillis(), nodeId,
                           null, null, emptyConfig, true, server.getVersion().getIncrementingVersion()));
                     } else if (channel0.supports(PacketImpl.CLUSTER_TOPOLOGY_V2)) {
                        channel0.send(
                           new ClusterTopologyChangeMessage_V2(System.currentTimeMillis(),
                                                               nodeId, null, emptyConfig, true));
                     } else {
                        channel0.send(
                           new ClusterTopologyChangeMessage(nodeId, emptyConfig, true));
                     }
                  }
               });
            }
         } else if (packet.getType() == PacketImpl.FEDERATION_DOWNSTREAM_CONNECT) {
            //If we receive this packet then a remote broker is requesting us to create federated upstream connection
            //back to it which simulates a downstream connection
            final FederationDownstreamConnectMessage message = (FederationDownstreamConnectMessage) packet;
            final FederationDownstreamConfiguration downstreamConfiguration = message.getStreamConfiguration();

            //Create a new Upstream Federation configuration based on the received Downstream connection message
            //from the remote broker
            //The idea here is to set all the same configuration parameters that apply to the upstream connection
            final FederationConfiguration config = new FederationConfiguration();
            config.setName(message.getName() + FederationDownstreamConnectMessage.UPSTREAM_SUFFIX);
            config.setCredentials(message.getCredentials());

            //Add the policy map configuration
            for (FederationPolicy policy : message.getFederationPolicyMap().values()) {
               config.addFederationPolicy(policy);
            }

            //Add any transformer configurations
            for (FederationTransformerConfiguration transformerConfiguration : message.getTransformerConfigurationMap().values()) {
               config.addTransformerConfiguration(transformerConfiguration);
            }

            //Create an upstream configuration with the same name but apply the upstream suffix so it is unique
            final FederationUpstreamConfiguration upstreamConfiguration = new FederationUpstreamConfiguration()
               .setName(downstreamConfiguration.getName() + FederationDownstreamConnectMessage.UPSTREAM_SUFFIX)
               .addPolicyRefs(downstreamConfiguration.getPolicyRefs());

            //Use the provided Transport Configuration information to create an upstream connection back to the broker that
            //created the downstream connection
            final TransportConfiguration upstreamConfig = downstreamConfiguration.getUpstreamConfiguration();

            //Initialize the upstream transport with the config from the acceptor as this will apply
            //relevant settings such as SSL, then override with settings from the downstream config
            final Map<String, Object> params = new HashMap<>(acceptorUsed.getConfiguration());
            if (upstreamConfig.getParams() != null) {
               params.putAll(upstreamConfig.getParams());
            }
            final Map<String, Object> extraParams = new HashMap<>();
            if (upstreamConfig.getExtraParams() != null) {
               extraParams.putAll(upstreamConfig.getExtraParams());
            }

            //Add the new upstream configuration that was created so we can connect back to the downstream server
            final TransportConfiguration upstreamConf = new TransportConfiguration(
               upstreamConfig.getFactoryClassName(), params, upstreamConfig.getName() + FederationDownstreamConnectMessage.UPSTREAM_SUFFIX,
               extraParams);
            server.getConfiguration()
               .addConnectorConfiguration(upstreamConf.getName() + FederationDownstreamConnectMessage.UPSTREAM_SUFFIX, upstreamConf);

            //Create a new upstream connection config based on the downstream configuration
            FederationConnectionConfiguration downstreamConConf = downstreamConfiguration.getConnectionConfiguration();
            FederationConnectionConfiguration upstreamConConf = upstreamConfiguration.getConnectionConfiguration();
            List<String> connectorNames = new ArrayList<>();
            connectorNames.add(upstreamConf.getName() + FederationDownstreamConnectMessage.UPSTREAM_SUFFIX);

            //Configure all of the upstream connection parameters from the downstream connection that are relevant
            //Note that HA and discoveryGroupName are skipped because the downstream connection will manage that
            //In this case we just want to create a connection back to the broker that sent the downstream packet.
            //If this broker goes down then the original broker (if configured with HA) will re-establish a new
            //connection to another broker which will then create another upstream, etc
            upstreamConConf.setStaticConnectors(connectorNames);
            upstreamConConf.setUsername(downstreamConConf.getUsername());
            upstreamConConf.setPassword(downstreamConConf.getPassword());
            upstreamConConf.setShareConnection(downstreamConConf.isShareConnection());
            upstreamConConf.setPriorityAdjustment(downstreamConConf.getPriorityAdjustment());
            upstreamConConf.setClientFailureCheckPeriod(downstreamConConf.getClientFailureCheckPeriod());
            upstreamConConf.setConnectionTTL(downstreamConConf.getConnectionTTL());
            upstreamConConf.setRetryInterval(downstreamConConf.getRetryInterval());
            upstreamConConf.setRetryIntervalMultiplier(downstreamConConf.getRetryIntervalMultiplier());
            upstreamConConf.setMaxRetryInterval(downstreamConConf.getMaxRetryInterval());
            upstreamConConf.setInitialConnectAttempts(downstreamConConf.getInitialConnectAttempts());
            upstreamConConf.setReconnectAttempts(downstreamConConf.getReconnectAttempts());
            upstreamConConf.setCallTimeout(downstreamConConf.getCallTimeout());
            upstreamConConf.setCallFailoverTimeout(downstreamConConf.getCallFailoverTimeout());
            config.addUpstreamConfiguration(upstreamConfiguration);

            //Register close and failure listeners, if the initial downstream connection goes down then we
            //want to terminate the upstream connection
            rc.addCloseListener(() -> {
               server.getFederationManager().undeploy(config.getName());
            });

            rc.addFailureListener(new FailureListener() {
               @Override
               public void connectionFailed(ActiveMQException exception, boolean failedOver) {
                  server.getFederationManager().undeploy(config.getName());
               }

               @Override
               public void connectionFailed(ActiveMQException exception, boolean failedOver,
                                            String scaleDownTargetNodeID) {
                  server.getFederationManager().undeploy(config.getName());
               }
            });

            try {
               server.getFederationManager().deploy(config);
            } catch (Exception e) {
               logger.error("Error deploying federation", e);
            }
         }
      }