private void sendTraceDataByMQ()

in ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/AsyncArrayDispatcher.java [292:338]


        private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic,
            String currentRegionId) {
            String topic = OnsTraceConstants.traceTopic + currentRegionId;
            final Message message = new Message(topic, data.getBytes());
            message.setKeys(keySet);
            try {
                Set<String> dataBrokerSet = getBrokerSetByTopic(dataTopic);
                Set<String> traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), topic);
                dataBrokerSet.retainAll(traceBrokerSet);
                SendCallback callback = new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                    }

                    @Override
                    public void onException(Throwable e) {
                        CLIENT_LOG.info("send trace data ,the traceData is " + data);
                    }
                };
                if (dataBrokerSet.isEmpty()) {
                    //no cross set
                    traceProducer.send(message, callback, 5000);
                } else {
                    traceProducer.send(message, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            Set<String> brokerSet = (Set<String>) arg;
                            List<MessageQueue> filterMqs = new ArrayList<MessageQueue>();
                            for (MessageQueue queue : mqs) {
                                if (brokerSet.contains(queue.getBrokerName())) {
                                    filterMqs.add(queue);
                                }
                            }
                            int index = sendWhichQueue.getAndIncrement();
                            int pos = Math.abs(index) % filterMqs.size();
                            if (pos < 0) {
                                pos = 0;
                            }
                            return filterMqs.get(pos);
                        }
                    }, dataBrokerSet, callback);
                }

            } catch (Exception e) {
                CLIENT_LOG.info("send trace data,the traceData is" + data);
            }
        }