public void send()

in activemq-broker/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java [96:236]


    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
        ActiveMQDestination msgDest = messageSend.getDestination();
        ActiveMQDestination replyTo = messageSend.getReplyTo();
        if ((replyTo != null) && (msgDest.getPhysicalName().startsWith(STATS_PREFIX))) {
            String physicalName = msgDest.getPhysicalName();
            boolean destStats = physicalName.startsWith(STATS_DESTINATION_PREFIX);
            boolean brokerStats = physicalName.startsWith(STATS_BROKER_PREFIX);
            boolean subStats = physicalName.startsWith(STATS_SUBSCRIPTION_PREFIX);
            BrokerService brokerService = getBrokerService();
            RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
            if (destStats) {
                String destinationName = physicalName.substring(STATS_DESTINATION_PREFIX.length());
                if (destinationName.startsWith(".")) {
                    destinationName = destinationName.substring(1);
                }
                String destinationQuery = destinationName.replace(STATS_DENOTE_END_LIST,"");
                boolean endListMessage = !destinationName.equals(destinationQuery)
                        || messageSend.getProperties().containsKey(STATS_DENOTE_END_LIST);
                ActiveMQDestination queryDestination = ActiveMQDestination.createDestination(destinationQuery,msgDest.getDestinationType());
                Set<Destination> destinations = getDestinations(queryDestination);

                boolean includeFirstMessageTimestamp = messageSend.getProperties().containsKey(STATS_FIRST_MESSAGE_TIMESTAMP);
                List<Message> tempFirstMessage = includeFirstMessageTimestamp ? new ArrayList<>(1) : null;

                for (Destination dest : destinations) {
                    DestinationStatistics stats = dest.getDestinationStatistics();
                    if (stats != null) {
                        ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
                        statsMessage.setString("brokerName", regionBroker.getBrokerName());
                        statsMessage.setString("brokerId", regionBroker.getBrokerId().toString());
                        statsMessage.setString("destinationName", dest.getActiveMQDestination().toString());
                        statsMessage.setLong("size", stats.getMessages().getCount());
                        statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount());
                        statsMessage.setLong("dequeueCount", stats.getDequeues().getCount());
                        statsMessage.setLong("dispatchCount", stats.getDispatched().getCount());
                        statsMessage.setLong("expiredCount", stats.getExpired().getCount());
                        statsMessage.setLong("inflightCount", stats.getInflight().getCount());
                        statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount());
                        // we are okay with the size without decimals so cast to long
                        statsMessage.setLong("averageMessageSize", (long) stats.getMessageSize().getAverageSize());
                        statsMessage.setInt("memoryPercentUsage", dest.getMemoryUsage().getPercentUsage());
                        statsMessage.setLong("memoryUsage", dest.getMemoryUsage().getUsage());
                        statsMessage.setLong("memoryLimit", dest.getMemoryUsage().getLimit());
                        statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime());
                        statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime());
                        statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime());
                        statsMessage.setLong("consumerCount", stats.getConsumers().getCount());
                        statsMessage.setLong("producerCount", stats.getProducers().getCount());
                        if (includeFirstMessageTimestamp) {
                            //AMQ-9452: unwrap BaseDestination
                            while (dest instanceof DestinationFilter) {
                                dest = ((DestinationFilter) dest).getNext();
                            }
                            if (dest instanceof Queue) {
                                ((Queue) dest).doBrowse(tempFirstMessage, 1);
                            }
                            else if (dest instanceof Topic) {
                                ((Topic) dest).doBrowse(tempFirstMessage, 1);
                            }
                            if (!tempFirstMessage.isEmpty()) {
                                Message message = tempFirstMessage.get(0);
                                // NOTICE: Client-side, you may get the broker "now" Timestamp by msg.getJMSTimestamp()
                                // This allows for calculating age.
                                statsMessage.setLong("firstMessageTimestamp", message.getBrokerInTime());
                                tempFirstMessage.clear();
                            }
                        }
                        statsMessage.setJMSCorrelationID(messageSend.getCorrelationId());
                        sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo);
                    }
                }
                if(endListMessage){
                    ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
                    statsMessage.setJMSCorrelationID(messageSend.getCorrelationId());
                    sendStats(producerExchange.getConnectionContext(),statsMessage,replyTo);
                }

            } else if (subStats) {
                sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getQueueSubscribers(), replyTo);
                sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getTopicSubscribers(), replyTo);
            } else if (brokerStats) {

                if (messageSend.getProperties().containsKey(STATS_BROKER_RESET_HEADER)) {
                    getBrokerView().resetStatistics();
                }

                ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
                SystemUsage systemUsage = brokerService.getSystemUsage();
                DestinationStatistics stats = regionBroker.getDestinationStatistics();
                statsMessage.setString("brokerName", regionBroker.getBrokerName());
                statsMessage.setString("brokerId", regionBroker.getBrokerId().toString());
                statsMessage.setLong("size", stats.getMessages().getCount());
                statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount());
                statsMessage.setLong("dequeueCount", stats.getDequeues().getCount());
                statsMessage.setLong("dispatchCount", stats.getDispatched().getCount());
                statsMessage.setLong("expiredCount", stats.getExpired().getCount());
                statsMessage.setLong("inflightCount", stats.getInflight().getCount());
                // we are okay with the size without decimals so cast to long
                statsMessage.setLong("averageMessageSize",(long) stats.getMessageSize().getAverageSize());
                statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount());
                statsMessage.setInt("memoryPercentUsage", systemUsage.getMemoryUsage().getPercentUsage());
                statsMessage.setLong("memoryUsage", systemUsage.getMemoryUsage().getUsage());
                statsMessage.setLong("memoryLimit", systemUsage.getMemoryUsage().getLimit());
                statsMessage.setInt("storePercentUsage", systemUsage.getStoreUsage().getPercentUsage());
                statsMessage.setLong("storeUsage", systemUsage.getStoreUsage().getUsage());
                statsMessage.setLong("storeLimit", systemUsage.getStoreUsage().getLimit());
                statsMessage.setInt("tempPercentUsage", systemUsage.getTempUsage().getPercentUsage());
                statsMessage.setLong("tempUsage", systemUsage.getTempUsage().getUsage());
                statsMessage.setLong("tempLimit", systemUsage.getTempUsage().getLimit());
                statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime());
                statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime());
                statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime());
                statsMessage.setLong("consumerCount", stats.getConsumers().getCount());
                statsMessage.setLong("producerCount", stats.getProducers().getCount());
                String answer = brokerService.getTransportConnectorURIsAsMap().get("tcp");
                answer = answer != null ? answer : "";
                statsMessage.setString("openwire", answer);
                answer = brokerService.getTransportConnectorURIsAsMap().get("stomp");
                answer = answer != null ? answer : "";
                statsMessage.setString("stomp", answer);
                answer = brokerService.getTransportConnectorURIsAsMap().get("ssl");
                answer = answer != null ? answer : "";
                statsMessage.setString("ssl", answer);
                answer = brokerService.getTransportConnectorURIsAsMap().get("stomp+ssl");
                answer = answer != null ? answer : "";
                statsMessage.setString("stomp+ssl", answer);
                URI uri = brokerService.getVmConnectorURI();
                answer = uri != null ? uri.toString() : "";
                statsMessage.setString("vm", answer);
                File file = brokerService.getDataDirectoryFile();
                answer = file != null ? file.getCanonicalPath() : "";
                statsMessage.setString("dataDirectory", answer);
                statsMessage.setJMSCorrelationID(messageSend.getCorrelationId());
                sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo);
            } else {
                super.send(producerExchange, messageSend);
            }
        } else {
            super.send(producerExchange, messageSend);
        }
    }