in src/producer/DefaultMQProducerImpl.cpp [180:217]
BatchMessage DefaultMQProducerImpl::buildBatchMessage(std::vector<MQMessage>& msgs) {
if (msgs.size() < 1) {
THROW_MQEXCEPTION(MQClientException, "msgs need one message at least", -1);
}
BatchMessage batchMessage;
bool firstFlag = true;
string topic;
bool waitStoreMsgOK = false;
for (auto& msg : msgs) {
Validators::checkMessage(msg, getMaxMessageSize());
if (!NameSpaceUtil::hasNameSpace(msg.getTopic(), getNameSpace())) {
MessageAccessor::withNameSpace(msg, getNameSpace());
}
if (firstFlag) {
topic = msg.getTopic();
waitStoreMsgOK = msg.isWaitStoreMsgOK();
firstFlag = false;
if (UtilAll::startsWith_retry(topic)) {
THROW_MQEXCEPTION(MQClientException, "Retry Group is not supported for batching", -1);
}
} else {
if (msg.getDelayTimeLevel() > 0) {
THROW_MQEXCEPTION(MQClientException, "TimeDelayLevel in not supported for batching", -1);
}
if (msg.getTopic() != topic) {
THROW_MQEXCEPTION(MQClientException, "msgs need one message at least", -1);
}
if (msg.isWaitStoreMsgOK() != waitStoreMsgOK) {
THROW_MQEXCEPTION(MQClientException, "msgs need one message at least", -2);
}
}
}
batchMessage.setBody(BatchMessage::encode(msgs));
batchMessage.setTopic(topic);
batchMessage.setWaitStoreMsgOK(waitStoreMsgOK);
return batchMessage;
}