void SimpleConsumerImpl::receive()

in cpp/source/rocketmq/SimpleConsumerImpl.cpp [293:360]


void SimpleConsumerImpl::receive(std::size_t limit,
                                 std::chrono::milliseconds invisible_duration,
                                 ReceiveCallback callback) {
  rmq::Assignment assignment;
  {
    absl::MutexLock lk(&assignments_mtx_);
    if (assignments_.empty()) {
      std::error_code ec = ErrorCode::NotFound;
      std::vector<MessageConstSharedPtr> messages;
      callback(ec, messages);
      return;
    }

    // choose assign allow readable
    std::size_t start_index = ++assignment_index_ % assignments_.size();
    for (std::size_t i = 0; i < assignments_.size(); ++i) {
      const auto& assign = assignments_[(start_index + i) % assignments_.size()];
      if (readable(assign.message_queue().permission())) {
        assignment.CopyFrom(assign);
        break;
      }
    }

    if (!assignment.IsInitialized()) {
      std::error_code ec = ErrorCode::NotFound;
      std::vector<MessageConstSharedPtr> messages;
      callback(ec, messages);
      return;
    }
  }

  const auto& target = urlOf(assignment.message_queue());
  Metadata metadata;
  Signature::sign(client_config_, metadata);

  rmq::ReceiveMessageRequest request;
  request.set_auto_renew(false);
  request.mutable_group()->CopyFrom(config().subscriber.group);
  request.mutable_message_queue()->CopyFrom(assignment.message_queue());
  request.set_batch_size((int32_t) limit);

  request.mutable_filter_expression()->set_type(rmq::FilterType::TAG);
  request.mutable_filter_expression()->set_expression("*");

  auto invisible_duration_request =
      google::protobuf::util::TimeUtil::MillisecondsToDuration(invisible_duration.count());
  request.mutable_invisible_duration()->set_nanos(invisible_duration_request.nanos());
  request.mutable_invisible_duration()->set_seconds(invisible_duration_request.seconds());

  auto await_duration_request =
      google::protobuf::util::TimeUtil::MillisecondsToDuration(
          MixAll::millisecondsOf(long_polling_duration_));
  request.mutable_long_polling_timeout()->set_nanos(await_duration_request.nanos());
  request.mutable_long_polling_timeout()->set_seconds(await_duration_request.seconds());

  auto cb = [callback](const std::error_code& ec, const ReceiveMessageResult& result) {
    std::vector<MessageConstSharedPtr> messages;
    if (ec) {
      callback(ec, messages);
      return;
    }

    callback(ec, result.messages);
  };

  manager()->receiveMessage(target, metadata, request,
                            long_polling_duration_ + absl::ToChronoMilliseconds(requestTimeout()), cb);
}