in cpp/source/rocketmq/ProducerImpl.cpp [110:193]
void ProducerImpl::wrapSendMessageRequest(const Message& message, SendMessageRequest& request,
const rmq::MessageQueue& message_queue) {
auto msg = new rmq::Message();
msg->mutable_topic()->set_resource_namespace(resourceNamespace());
msg->mutable_topic()->set_name(message.topic());
auto system_properties = msg->mutable_system_properties();
// Handle Tag
if (!message.tag().empty()) {
system_properties->set_tag(message.tag());
}
// Handle Key
const auto& keys = message.keys();
if (!keys.empty()) {
system_properties->mutable_keys()->Add(keys.begin(), keys.end());
}
// TraceContext
if (!message.traceContext().empty()) {
const auto& trace_context = message.traceContext();
if (!trace_context.empty()) {
system_properties->set_trace_context(trace_context);
}
}
// Delivery Timestamp
if (message.deliveryTimestamp().time_since_epoch().count()) {
auto delivery_timestamp = message.deliveryTimestamp();
if (delivery_timestamp.time_since_epoch().count()) {
auto duration = delivery_timestamp.time_since_epoch();
system_properties->set_delivery_attempt(std::chrono::duration_cast<std::chrono::milliseconds>(duration).count());
auto mutable_delivery_timestamp = system_properties->mutable_delivery_timestamp();
mutable_delivery_timestamp->set_seconds(std::chrono::duration_cast<std::chrono::seconds>(duration).count());
mutable_delivery_timestamp->set_nanos(std::chrono::duration_cast<std::chrono::nanoseconds>(duration).count() % 1000000000);
}
}
// Born-time
auto duration = absl::Now() - absl::UnixEpoch();
int64_t seconds = absl::ToInt64Seconds(duration);
system_properties->mutable_born_timestamp()->set_seconds(seconds);
system_properties->mutable_born_timestamp()->set_nanos(absl::ToInt64Nanoseconds(duration - absl::Seconds(seconds)));
system_properties->set_born_host(UtilAll::hostname());
if (message.deliveryTimestamp().time_since_epoch().count()) {
system_properties->set_message_type(rmq::MessageType::DELAY);
} else if (!message.group().empty()) {
system_properties->set_message_type(rmq::MessageType::FIFO);
} else if (message.extension().transactional) {
system_properties->set_message_type(rmq::MessageType::TRANSACTION);
} else {
system_properties->set_message_type(rmq::MessageType::NORMAL);
}
if (message.body().size() >= compress_body_threshold_) {
std::string compressed_body;
UtilAll::compress(message.body(), compressed_body);
msg->set_body(compressed_body);
system_properties->set_body_encoding(rmq::Encoding::GZIP);
} else {
msg->set_body(message.body());
system_properties->set_body_encoding(rmq::Encoding::IDENTITY);
}
if (!message.group().empty()) {
system_properties->set_message_group(message.group());
}
system_properties->set_message_id(message.id());
system_properties->set_queue_id(message_queue.id());
// Forward user-defined-properties
for (auto& item : message.properties()) {
msg->mutable_user_properties()->insert({item.first, item.second});
}
// Add into request
request.mutable_messages()->AddAllocated(msg);
SPDLOG_TRACE("SendMessageRequest: {}", request.DebugString());
}