in ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/AsyncArrayDispatcher.java [292:338]
private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic,
String currentRegionId) {
String topic = OnsTraceConstants.traceTopic + currentRegionId;
final Message message = new Message(topic, data.getBytes());
message.setKeys(keySet);
try {
Set<String> dataBrokerSet = getBrokerSetByTopic(dataTopic);
Set<String> traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), topic);
dataBrokerSet.retainAll(traceBrokerSet);
SendCallback callback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
}
@Override
public void onException(Throwable e) {
CLIENT_LOG.info("send trace data ,the traceData is " + data);
}
};
if (dataBrokerSet.isEmpty()) {
//no cross set
traceProducer.send(message, callback, 5000);
} else {
traceProducer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Set<String> brokerSet = (Set<String>) arg;
List<MessageQueue> filterMqs = new ArrayList<MessageQueue>();
for (MessageQueue queue : mqs) {
if (brokerSet.contains(queue.getBrokerName())) {
filterMqs.add(queue);
}
}
int index = sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % filterMqs.size();
if (pos < 0) {
pos = 0;
}
return filterMqs.get(pos);
}
}, dataBrokerSet, callback);
}
} catch (Exception e) {
CLIENT_LOG.info("send trace data,the traceData is" + data);
}
}