in cpp/source/client/ClientManagerImpl.cpp [767:952]
MessageConstSharedPtr ClientManagerImpl::wrapMessage(const rmq::Message& item) {
assert(item.topic().resource_namespace() == resource_namespace_);
auto builder = Message::newBuilder();
// base
builder.withTopic(item.topic().name());
const auto& system_properties = item.system_properties();
// Tag
if (system_properties.has_tag()) {
builder.withTag(system_properties.tag());
}
// Keys
std::vector<std::string> keys;
for (const auto& key : system_properties.keys()) {
keys.push_back(key);
}
if (!keys.empty()) {
builder.withKeys(std::move(keys));
}
// Message-Id
const auto& message_id = system_properties.message_id();
builder.withId(message_id);
// Validate body digest
const rmq::Digest& digest = system_properties.body_digest();
bool body_digest_match = false;
if (item.body().empty()) {
SPDLOG_WARN("Body of message[topic={}, msgId={}] is empty", item.topic().name(),
item.system_properties().message_id());
body_digest_match = true;
} else {
switch (digest.type()) {
case rmq::DigestType::CRC32: {
std::string checksum;
bool success = MixAll::crc32(item.body(), checksum);
if (success) {
body_digest_match = (digest.checksum() == checksum);
if (body_digest_match) {
SPDLOG_DEBUG("Message body CRC32 checksum validation passed.");
} else {
SPDLOG_WARN("Body CRC32 checksum validation failed. Actual: {}, expect: {}", checksum, digest.checksum());
}
} else {
SPDLOG_WARN("Failed to calculate CRC32 checksum. Skip.");
}
break;
}
case rmq::DigestType::MD5: {
std::string checksum;
bool success = MixAll::md5(item.body(), checksum);
if (success) {
body_digest_match = (digest.checksum() == checksum);
if (body_digest_match) {
SPDLOG_DEBUG("Body of message[{}] MD5 checksum validation passed.", message_id);
} else {
SPDLOG_WARN("Body of message[{}] MD5 checksum validation failed. Expect: {}, Actual: {}", message_id,
digest.checksum(), checksum);
}
} else {
SPDLOG_WARN("Failed to calculate MD5 digest. Skip.");
body_digest_match = true;
}
break;
}
case rmq::DigestType::SHA1: {
std::string checksum;
bool success = MixAll::sha1(item.body(), checksum);
if (success) {
body_digest_match = (checksum == digest.checksum());
if (body_digest_match) {
SPDLOG_DEBUG("Body of message[{}] SHA1 checksum validation passed", message_id);
} else {
SPDLOG_WARN("Body of message[{}] SHA1 checksum validation failed. Expect: {}, Actual: {}", message_id,
digest.checksum(), checksum);
}
} else {
SPDLOG_WARN("Failed to calculate SHA1 digest for message[{}]. Skip.", message_id);
}
break;
}
default: {
SPDLOG_WARN("Unsupported message body digest algorithm");
body_digest_match = true;
break;
}
}
}
if (!body_digest_match) {
SPDLOG_WARN("Message body checksum failed. MsgId={}", system_properties.message_id());
// TODO: NACK it immediately
return nullptr;
}
// Body encoding
switch (system_properties.body_encoding()) {
case rmq::Encoding::GZIP: {
std::string uncompressed;
UtilAll::uncompress(item.body(), uncompressed);
builder.withBody(uncompressed);
break;
}
case rmq::Encoding::IDENTITY: {
builder.withBody(item.body());
break;
}
default: {
SPDLOG_WARN("Unsupported encoding algorithm");
break;
}
}
// User-properties
std::unordered_map<std::string, std::string> properties;
for (const auto& it : item.user_properties()) {
properties.insert(std::make_pair(it.first, it.second));
}
if (!properties.empty()) {
builder.withProperties(properties);
}
// Born-timestamp
if (system_properties.has_born_timestamp()) {
auto born_timestamp = google::protobuf::util::TimeUtil::TimestampToMilliseconds(system_properties.born_timestamp());
builder.withBornTime(absl::ToChronoTime(absl::FromUnixMillis(born_timestamp)));
}
// Born-host
builder.withBornHost(system_properties.born_host());
// Trace-context
if (system_properties.has_trace_context()) {
builder.withTraceContext(system_properties.trace_context());
}
auto message = builder.build();
const Message* raw = message.release();
Message* msg = const_cast<Message*>(raw);
Extension& extension = msg->mutableExtension();
// Receipt-handle
extension.receipt_handle = system_properties.receipt_handle();
// Store-timestamp
if (system_properties.has_store_timestamp()) {
auto store_timestamp =
google::protobuf::util::TimeUtil::TimestampToMilliseconds(system_properties.store_timestamp());
extension.store_time = absl::ToChronoTime(absl::FromUnixMillis(store_timestamp));
}
// Store-host
extension.store_host = system_properties.store_host();
// Process one-of: delivery-timestamp and delay-level.
if (system_properties.has_delivery_timestamp()) {
auto delivery_timestamp_ms =
google::protobuf::util::TimeUtil::TimestampToMilliseconds(system_properties.delivery_timestamp());
extension.delivery_timepoint = absl::ToChronoTime(absl::FromUnixMillis(delivery_timestamp_ms));
}
// Queue-id
extension.queue_id = system_properties.queue_id();
// Queue-offset
extension.offset = system_properties.queue_offset();
// Invisible-period
if (system_properties.has_invisible_duration()) {
auto invisible_period = std::chrono::seconds(system_properties.invisible_duration().seconds()) +
std::chrono::nanoseconds(system_properties.invisible_duration().nanos());
extension.invisible_period = invisible_period;
}
// Delivery attempt
extension.delivery_attempt = system_properties.delivery_attempt();
// Decoded Time-Point
extension.decode_time = std::chrono::system_clock::now();
return MessageConstSharedPtr(raw);
}