in cpp/source/rocketmq/SimpleConsumerImpl.cpp [293:360]
void SimpleConsumerImpl::receive(std::size_t limit,
std::chrono::milliseconds invisible_duration,
ReceiveCallback callback) {
rmq::Assignment assignment;
{
absl::MutexLock lk(&assignments_mtx_);
if (assignments_.empty()) {
std::error_code ec = ErrorCode::NotFound;
std::vector<MessageConstSharedPtr> messages;
callback(ec, messages);
return;
}
// choose assign allow readable
std::size_t start_index = ++assignment_index_ % assignments_.size();
for (std::size_t i = 0; i < assignments_.size(); ++i) {
const auto& assign = assignments_[(start_index + i) % assignments_.size()];
if (readable(assign.message_queue().permission())) {
assignment.CopyFrom(assign);
break;
}
}
if (!assignment.IsInitialized()) {
std::error_code ec = ErrorCode::NotFound;
std::vector<MessageConstSharedPtr> messages;
callback(ec, messages);
return;
}
}
const auto& target = urlOf(assignment.message_queue());
Metadata metadata;
Signature::sign(client_config_, metadata);
rmq::ReceiveMessageRequest request;
request.set_auto_renew(false);
request.mutable_group()->CopyFrom(config().subscriber.group);
request.mutable_message_queue()->CopyFrom(assignment.message_queue());
request.set_batch_size((int32_t) limit);
request.mutable_filter_expression()->set_type(rmq::FilterType::TAG);
request.mutable_filter_expression()->set_expression("*");
auto invisible_duration_request =
google::protobuf::util::TimeUtil::MillisecondsToDuration(invisible_duration.count());
request.mutable_invisible_duration()->set_nanos(invisible_duration_request.nanos());
request.mutable_invisible_duration()->set_seconds(invisible_duration_request.seconds());
auto await_duration_request =
google::protobuf::util::TimeUtil::MillisecondsToDuration(
MixAll::millisecondsOf(long_polling_duration_));
request.mutable_long_polling_timeout()->set_nanos(await_duration_request.nanos());
request.mutable_long_polling_timeout()->set_seconds(await_duration_request.seconds());
auto cb = [callback](const std::error_code& ec, const ReceiveMessageResult& result) {
std::vector<MessageConstSharedPtr> messages;
if (ec) {
callback(ec, messages);
return;
}
callback(ec, result.messages);
};
manager()->receiveMessage(target, metadata, request,
long_polling_duration_ + absl::ToChronoMilliseconds(requestTimeout()), cb);
}