in activemq-broker/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java [96:236]
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
ActiveMQDestination msgDest = messageSend.getDestination();
ActiveMQDestination replyTo = messageSend.getReplyTo();
if ((replyTo != null) && (msgDest.getPhysicalName().startsWith(STATS_PREFIX))) {
String physicalName = msgDest.getPhysicalName();
boolean destStats = physicalName.startsWith(STATS_DESTINATION_PREFIX);
boolean brokerStats = physicalName.startsWith(STATS_BROKER_PREFIX);
boolean subStats = physicalName.startsWith(STATS_SUBSCRIPTION_PREFIX);
BrokerService brokerService = getBrokerService();
RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
if (destStats) {
String destinationName = physicalName.substring(STATS_DESTINATION_PREFIX.length());
if (destinationName.startsWith(".")) {
destinationName = destinationName.substring(1);
}
String destinationQuery = destinationName.replace(STATS_DENOTE_END_LIST,"");
boolean endListMessage = !destinationName.equals(destinationQuery)
|| messageSend.getProperties().containsKey(STATS_DENOTE_END_LIST);
ActiveMQDestination queryDestination = ActiveMQDestination.createDestination(destinationQuery,msgDest.getDestinationType());
Set<Destination> destinations = getDestinations(queryDestination);
boolean includeFirstMessageTimestamp = messageSend.getProperties().containsKey(STATS_FIRST_MESSAGE_TIMESTAMP);
List<Message> tempFirstMessage = includeFirstMessageTimestamp ? new ArrayList<>(1) : null;
for (Destination dest : destinations) {
DestinationStatistics stats = dest.getDestinationStatistics();
if (stats != null) {
ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
statsMessage.setString("brokerName", regionBroker.getBrokerName());
statsMessage.setString("brokerId", regionBroker.getBrokerId().toString());
statsMessage.setString("destinationName", dest.getActiveMQDestination().toString());
statsMessage.setLong("size", stats.getMessages().getCount());
statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount());
statsMessage.setLong("dequeueCount", stats.getDequeues().getCount());
statsMessage.setLong("dispatchCount", stats.getDispatched().getCount());
statsMessage.setLong("expiredCount", stats.getExpired().getCount());
statsMessage.setLong("inflightCount", stats.getInflight().getCount());
statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount());
// we are okay with the size without decimals so cast to long
statsMessage.setLong("averageMessageSize", (long) stats.getMessageSize().getAverageSize());
statsMessage.setInt("memoryPercentUsage", dest.getMemoryUsage().getPercentUsage());
statsMessage.setLong("memoryUsage", dest.getMemoryUsage().getUsage());
statsMessage.setLong("memoryLimit", dest.getMemoryUsage().getLimit());
statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime());
statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime());
statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime());
statsMessage.setLong("consumerCount", stats.getConsumers().getCount());
statsMessage.setLong("producerCount", stats.getProducers().getCount());
if (includeFirstMessageTimestamp) {
//AMQ-9452: unwrap BaseDestination
while (dest instanceof DestinationFilter) {
dest = ((DestinationFilter) dest).getNext();
}
if (dest instanceof Queue) {
((Queue) dest).doBrowse(tempFirstMessage, 1);
}
else if (dest instanceof Topic) {
((Topic) dest).doBrowse(tempFirstMessage, 1);
}
if (!tempFirstMessage.isEmpty()) {
Message message = tempFirstMessage.get(0);
// NOTICE: Client-side, you may get the broker "now" Timestamp by msg.getJMSTimestamp()
// This allows for calculating age.
statsMessage.setLong("firstMessageTimestamp", message.getBrokerInTime());
tempFirstMessage.clear();
}
}
statsMessage.setJMSCorrelationID(messageSend.getCorrelationId());
sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo);
}
}
if(endListMessage){
ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
statsMessage.setJMSCorrelationID(messageSend.getCorrelationId());
sendStats(producerExchange.getConnectionContext(),statsMessage,replyTo);
}
} else if (subStats) {
sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getQueueSubscribers(), replyTo);
sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getTopicSubscribers(), replyTo);
} else if (brokerStats) {
if (messageSend.getProperties().containsKey(STATS_BROKER_RESET_HEADER)) {
getBrokerView().resetStatistics();
}
ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
SystemUsage systemUsage = brokerService.getSystemUsage();
DestinationStatistics stats = regionBroker.getDestinationStatistics();
statsMessage.setString("brokerName", regionBroker.getBrokerName());
statsMessage.setString("brokerId", regionBroker.getBrokerId().toString());
statsMessage.setLong("size", stats.getMessages().getCount());
statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount());
statsMessage.setLong("dequeueCount", stats.getDequeues().getCount());
statsMessage.setLong("dispatchCount", stats.getDispatched().getCount());
statsMessage.setLong("expiredCount", stats.getExpired().getCount());
statsMessage.setLong("inflightCount", stats.getInflight().getCount());
// we are okay with the size without decimals so cast to long
statsMessage.setLong("averageMessageSize",(long) stats.getMessageSize().getAverageSize());
statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount());
statsMessage.setInt("memoryPercentUsage", systemUsage.getMemoryUsage().getPercentUsage());
statsMessage.setLong("memoryUsage", systemUsage.getMemoryUsage().getUsage());
statsMessage.setLong("memoryLimit", systemUsage.getMemoryUsage().getLimit());
statsMessage.setInt("storePercentUsage", systemUsage.getStoreUsage().getPercentUsage());
statsMessage.setLong("storeUsage", systemUsage.getStoreUsage().getUsage());
statsMessage.setLong("storeLimit", systemUsage.getStoreUsage().getLimit());
statsMessage.setInt("tempPercentUsage", systemUsage.getTempUsage().getPercentUsage());
statsMessage.setLong("tempUsage", systemUsage.getTempUsage().getUsage());
statsMessage.setLong("tempLimit", systemUsage.getTempUsage().getLimit());
statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime());
statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime());
statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime());
statsMessage.setLong("consumerCount", stats.getConsumers().getCount());
statsMessage.setLong("producerCount", stats.getProducers().getCount());
String answer = brokerService.getTransportConnectorURIsAsMap().get("tcp");
answer = answer != null ? answer : "";
statsMessage.setString("openwire", answer);
answer = brokerService.getTransportConnectorURIsAsMap().get("stomp");
answer = answer != null ? answer : "";
statsMessage.setString("stomp", answer);
answer = brokerService.getTransportConnectorURIsAsMap().get("ssl");
answer = answer != null ? answer : "";
statsMessage.setString("ssl", answer);
answer = brokerService.getTransportConnectorURIsAsMap().get("stomp+ssl");
answer = answer != null ? answer : "";
statsMessage.setString("stomp+ssl", answer);
URI uri = brokerService.getVmConnectorURI();
answer = uri != null ? uri.toString() : "";
statsMessage.setString("vm", answer);
File file = brokerService.getDataDirectoryFile();
answer = file != null ? file.getCanonicalPath() : "";
statsMessage.setString("dataDirectory", answer);
statsMessage.setJMSCorrelationID(messageSend.getCorrelationId());
sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo);
} else {
super.send(producerExchange, messageSend);
}
} else {
super.send(producerExchange, messageSend);
}
}