in ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderProducerImpl.java [125:153]
public SendResult send(final Message message, final String shardingKey) {
if (UtilAll.isBlank(shardingKey)) {
throw new ONSClientException("\'shardingKey\' is blank.");
}
message.setShardingKey(shardingKey);
this.checkONSProducerServiceState(this.defaultMQProducer.getDefaultMQProducerImpl());
final org.apache.rocketmq.common.message.Message msgRMQ = ONSUtil.msgConvert(message);
try {
org.apache.rocketmq.client.producer.SendResult sendResultRMQ =
this.defaultMQProducer.send(msgRMQ, new org.apache.rocketmq.client.producer.MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, org.apache.rocketmq.common.message.Message msg,
Object shardingKey) {
int select = Math.abs(shardingKey.hashCode());
if (select < 0) {
select = 0;
}
return mqs.get(select % mqs.size());
}
}, shardingKey);
message.setMsgID(sendResultRMQ.getMsgId());
SendResult sendResult = new SendResult();
sendResult.setTopic(message.getTopic());
sendResult.setMessageId(sendResultRMQ.getMsgId());
return sendResult;
} catch (Exception e) {
throw new ONSClientException("defaultMQProducer send order exception", e);
}
}