void ProducerImpl::wrapSendMessageRequest()

in cpp/source/rocketmq/ProducerImpl.cpp [110:193]


void ProducerImpl::wrapSendMessageRequest(const Message& message, SendMessageRequest& request,
                                          const rmq::MessageQueue& message_queue) {
  auto msg = new rmq::Message();

  msg->mutable_topic()->set_resource_namespace(resourceNamespace());
  msg->mutable_topic()->set_name(message.topic());

  auto system_properties = msg->mutable_system_properties();

  // Handle Tag
  if (!message.tag().empty()) {
    system_properties->set_tag(message.tag());
  }

  // Handle Key
  const auto& keys = message.keys();
  if (!keys.empty()) {
    system_properties->mutable_keys()->Add(keys.begin(), keys.end());
  }

  // TraceContext
  if (!message.traceContext().empty()) {
    const auto& trace_context = message.traceContext();
    if (!trace_context.empty()) {
      system_properties->set_trace_context(trace_context);
    }
  }

  // Delivery Timestamp
  if (message.deliveryTimestamp().time_since_epoch().count()) {
    auto delivery_timestamp = message.deliveryTimestamp();
    if (delivery_timestamp.time_since_epoch().count()) {
      auto duration = delivery_timestamp.time_since_epoch();
      system_properties->set_delivery_attempt(std::chrono::duration_cast<std::chrono::milliseconds>(duration).count());
      auto mutable_delivery_timestamp = system_properties->mutable_delivery_timestamp();
      mutable_delivery_timestamp->set_seconds(std::chrono::duration_cast<std::chrono::seconds>(duration).count());
      mutable_delivery_timestamp->set_nanos(std::chrono::duration_cast<std::chrono::nanoseconds>(duration).count() % 1000000000);
    }
  }

  // Born-time
  auto duration = absl::Now() - absl::UnixEpoch();
  int64_t seconds = absl::ToInt64Seconds(duration);
  system_properties->mutable_born_timestamp()->set_seconds(seconds);
  system_properties->mutable_born_timestamp()->set_nanos(absl::ToInt64Nanoseconds(duration - absl::Seconds(seconds)));

  system_properties->set_born_host(UtilAll::hostname());

  if (message.deliveryTimestamp().time_since_epoch().count()) {
    system_properties->set_message_type(rmq::MessageType::DELAY);
  } else if (!message.group().empty()) {
    system_properties->set_message_type(rmq::MessageType::FIFO);
  } else if (message.extension().transactional) {
    system_properties->set_message_type(rmq::MessageType::TRANSACTION);
  } else {
    system_properties->set_message_type(rmq::MessageType::NORMAL);
  }

  if (message.body().size() >= compress_body_threshold_) {
    std::string compressed_body;
    UtilAll::compress(message.body(), compressed_body);
    msg->set_body(compressed_body);
    system_properties->set_body_encoding(rmq::Encoding::GZIP);
  } else {
    msg->set_body(message.body());
    system_properties->set_body_encoding(rmq::Encoding::IDENTITY);
  }

  if (!message.group().empty()) {
    system_properties->set_message_group(message.group());
  }

  system_properties->set_message_id(message.id());
  system_properties->set_queue_id(message_queue.id());

  // Forward user-defined-properties
  for (auto& item : message.properties()) {
    msg->mutable_user_properties()->insert({item.first, item.second});
  }

  // Add into request
  request.mutable_messages()->AddAllocated(msg);
  SPDLOG_TRACE("SendMessageRequest: {}", request.DebugString());
}