in artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java [285:523]
public void onNotification(final Notification notification) {
if (!(notification.getType() instanceof CoreNotificationType))
return;
logger.trace("Receiving notification : {} on server {}", notification, server);
synchronized (notificationLock) {
CoreNotificationType type = (CoreNotificationType) notification.getType();
switch (type) {
case BINDING_ADDED: {
TypedProperties props = notification.getProperties();
if (!props.containsProperty(ManagementHelper.HDR_BINDING_TYPE)) {
throw ActiveMQMessageBundle.BUNDLE.bindingTypeNotSpecified();
}
Integer bindingType = props.getIntProperty(ManagementHelper.HDR_BINDING_TYPE);
if (bindingType == BindingType.DIVERT_INDEX) {
// We don't propagate diverts
return;
}
SimpleString routingName = props.getSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME);
SimpleString clusterName = props.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
SimpleString address = props.getSimpleStringProperty(ManagementHelper.HDR_ADDRESS);
if (!props.containsProperty(ManagementHelper.HDR_BINDING_ID)) {
throw ActiveMQMessageBundle.BUNDLE.bindingIdNotSpecified();
}
long id = props.getLongProperty(ManagementHelper.HDR_BINDING_ID);
SimpleString filterString = props.getSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING);
if (!props.containsProperty(ManagementHelper.HDR_DISTANCE)) {
logger.debug("PostOffice notification / BINDING_ADDED: HDR_DISANCE not specified, giving up propagation on notifications");
return;
}
int distance = props.getIntProperty(ManagementHelper.HDR_DISTANCE);
QueueInfo info = new QueueInfo(routingName, clusterName, address, filterString, id, distance);
queueInfos.put(clusterName, info);
if (distance < 1) {
//Binding added locally. If a matching remote binding with consumers exist, add a redistributor
Binding binding = addressManager.getBinding(routingName);
if (binding != null) {
Queue queue = (Queue) binding.getBindable();
AddressSettings addressSettings = addressSettingsRepository.getMatch(binding.getAddress().toString());
long redistributionDelay = addressSettings.getRedistributionDelay();
if (redistributionDelay == -1) {
//No need to keep looking since redistribution is not enabled
break;
}
try {
Bindings bindings = getBindingsForAddress(address);
for (Binding bind : bindings.getBindings()) {
if (bind.isConnected() && bind instanceof RemoteQueueBinding remoteBinding) {
if (remoteBinding.consumerCount() > 0) {
queue.addRedistributor(redistributionDelay);
break;
}
}
}
} catch (Exception ignore) {
}
}
}
break;
}
case BINDING_REMOVED: {
TypedProperties props = notification.getProperties();
if (!props.containsProperty(ManagementHelper.HDR_CLUSTER_NAME)) {
logger.debug("PostOffice notification / BINDING_REMOVED: HDR_CLUSTER_NAME not specified, giving up propagation on notifications");
return;
}
SimpleString clusterName = props.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
QueueInfo info = queueInfos.remove(clusterName);
if (info == null) {
logger.debug("PostOffice notification / BINDING_REMOVED: Cannot find queue info for clusterName {}", clusterName);
return;
}
break;
}
case CONSUMER_CREATED: {
TypedProperties props = notification.getProperties();
if (!props.containsProperty(ManagementHelper.HDR_CLUSTER_NAME)) {
logger.debug("PostOffice notification / CONSUMER_CREATED: No clusterName defined");
return;
}
SimpleString clusterName = props.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
SimpleString filterString = props.getSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING);
QueueInfo info = queueInfos.get(clusterName);
if (info == null) {
logger.debug("PostOffice notification / CONSUMER_CREATED: Could not find queue created on clusterName = {}", clusterName);
return;
}
info.incrementConsumers();
if (filterString != null) {
List<SimpleString> filterStrings = info.getFilterStrings();
if (filterStrings == null) {
filterStrings = new ArrayList<>();
info.setFilterStrings(filterStrings);
}
filterStrings.add(filterString);
}
if (!props.containsProperty(ManagementHelper.HDR_DISTANCE)) {
logger.debug("PostOffice notification / CONSUMER_CREATED: No distance specified");
return;
}
int distance = props.getIntProperty(ManagementHelper.HDR_DISTANCE);
if (distance > 0) {
SimpleString queueName = props.getSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME);
if (queueName == null) {
logger.debug("PostOffice notification / CONSUMER_CREATED: No queue defined");
return;
}
SimpleString addressName = props.getSimpleStringProperty(ManagementHelper.HDR_ADDRESS);
Binding binding = addressManager.getBinding(CompositeAddress.isFullyQualified(addressName) ? addressName : queueName);
if (binding != null) {
// We have a local queue
Queue queue = (Queue) binding.getBindable();
AddressSettings addressSettings = addressSettingsRepository.getMatch(binding.getAddress().toString());
long redistributionDelay = addressSettings.getRedistributionDelay();
if (redistributionDelay != -1) {
queue.addRedistributor(redistributionDelay);
}
}
}
break;
}
case CONSUMER_CLOSED: {
TypedProperties props = notification.getProperties();
SimpleString clusterName = props.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
if (clusterName == null) {
logger.debug("PostOffice notification / CONSUMER_CLOSED: No cluster name");
return;
}
SimpleString filterString = props.getSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING);
QueueInfo info = queueInfos.get(clusterName);
if (info == null) {
return;
}
info.decrementConsumers();
if (filterString != null) {
List<SimpleString> filterStrings = info.getFilterStrings();
filterStrings.remove(filterString);
}
// The consumer count should never be < 0 but we should catch here just in case.
if (info.getNumberOfConsumers() <= 0) {
if (!props.containsProperty(ManagementHelper.HDR_DISTANCE)) {
logger.debug("PostOffice notification / CONSUMER_CLOSED: HDR_DISTANCE not defined");
return;
}
int distance = props.getIntProperty(ManagementHelper.HDR_DISTANCE);
if (distance == 0) {
SimpleString queueName = props.getSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME);
if (queueName == null) {
logger.debug("PostOffice notification / CONSUMER_CLOSED: No queue name");
return;
}
Binding binding = addressManager.getBinding(queueName);
if (binding == null) {
logger.debug("PostOffice notification / CONSUMER_CLOSED: Could not find queue {}", queueName);
return;
}
Queue queue = (Queue) binding.getBindable();
AddressSettings addressSettings = addressSettingsRepository.getMatch(binding.getAddress().toString());
long redistributionDelay = addressSettings.getRedistributionDelay();
if (redistributionDelay != -1) {
queue.addRedistributor(redistributionDelay);
}
}
}
break;
}
default: {
break;
}
}
}
}