MessageConstSharedPtr ClientManagerImpl::wrapMessage()

in cpp/source/client/ClientManagerImpl.cpp [791:975]


MessageConstSharedPtr ClientManagerImpl::wrapMessage(const rmq::Message& item) {
  auto builder = Message::newBuilder();

  // base
  builder.withTopic(item.topic().name());

  const auto& system_properties = item.system_properties();

  // Tag
  if (system_properties.has_tag()) {
    builder.withTag(system_properties.tag());
  }

  // Keys
  std::vector<std::string> keys;
  for (const auto& key : system_properties.keys()) {
    keys.push_back(key);
  }
  if (!keys.empty()) {
    builder.withKeys(std::move(keys));
  }

  // Message-Id
  const auto& message_id = system_properties.message_id();
  builder.withId(message_id);

  // Validate body digest
  const rmq::Digest& digest = system_properties.body_digest();
  bool body_digest_match = false;
  if (item.body().empty()) {
    SPDLOG_WARN("Body of message[topic={}, msgId={}] is empty", item.topic().name(),
                item.system_properties().message_id());
    body_digest_match = true;
  } else {
    switch (digest.type()) {
      case rmq::DigestType::CRC32: {
        std::string checksum;
        bool success = MixAll::crc32(item.body(), checksum);
        if (success) {
          body_digest_match = (digest.checksum() == checksum);
          if (body_digest_match) {
            SPDLOG_DEBUG("Message body CRC32 checksum validation passed.");
          } else {
            SPDLOG_WARN("Body CRC32 checksum validation failed. Actual: {}, expect: {}", checksum, digest.checksum());
          }
        } else {
          SPDLOG_WARN("Failed to calculate CRC32 checksum. Skip.");
        }
        break;
      }
      case rmq::DigestType::MD5: {
        std::string checksum;
        bool success = MixAll::md5(item.body(), checksum);
        if (success) {
          body_digest_match = (digest.checksum() == checksum);
          if (body_digest_match) {
            SPDLOG_DEBUG("Body of message[{}] MD5 checksum validation passed.", message_id);
          } else {
            SPDLOG_WARN("Body of message[{}] MD5 checksum validation failed. Expect: {}, Actual: {}", message_id,
                        digest.checksum(), checksum);
          }
        } else {
          SPDLOG_WARN("Failed to calculate MD5 digest. Skip.");
          body_digest_match = true;
        }
        break;
      }
      case rmq::DigestType::SHA1: {
        std::string checksum;
        bool success = MixAll::sha1(item.body(), checksum);
        if (success) {
          body_digest_match = (checksum == digest.checksum());
          if (body_digest_match) {
            SPDLOG_DEBUG("Body of message[{}] SHA1 checksum validation passed", message_id);
          } else {
            SPDLOG_WARN("Body of message[{}] SHA1 checksum validation failed. Expect: {}, Actual: {}", message_id,
                        digest.checksum(), checksum);
          }
        } else {
          SPDLOG_WARN("Failed to calculate SHA1 digest for message[{}]. Skip.", message_id);
        }
        break;
      }
      default: {
        SPDLOG_WARN("Unsupported message body digest algorithm");
        body_digest_match = true;
        break;
      }
    }
  }

  if (!body_digest_match) {
    SPDLOG_WARN("Message body checksum failed. MsgId={}", system_properties.message_id());
    // TODO: NACK it immediately
    return nullptr;
  }

  // Body encoding
  switch (system_properties.body_encoding()) {
    case rmq::Encoding::GZIP: {
      std::string uncompressed;
      UtilAll::uncompress(item.body(), uncompressed);
      builder.withBody(uncompressed);
      break;
    }
    case rmq::Encoding::IDENTITY: {
      builder.withBody(item.body());
      break;
    }
    default: {
      SPDLOG_WARN("Unsupported encoding algorithm");
      break;
    }
  }

  // User-properties
  std::unordered_map<std::string, std::string> properties;
  for (const auto& it : item.user_properties()) {
    properties.insert(std::make_pair(it.first, it.second));
  }
  if (!properties.empty()) {
    builder.withProperties(properties);
  }

  // Born-timestamp
  if (system_properties.has_born_timestamp()) {
    auto born_timestamp = google::protobuf::util::TimeUtil::TimestampToMilliseconds(system_properties.born_timestamp());
    builder.withBornTime(absl::ToChronoTime(absl::FromUnixMillis(born_timestamp)));
  }

  // Born-host
  builder.withBornHost(system_properties.born_host());

  // Trace-context
  if (system_properties.has_trace_context()) {
    builder.withTraceContext(system_properties.trace_context());
  }

  auto message = builder.build();

  const Message* raw = message.release();
  Message* msg = const_cast<Message*>(raw);
  Extension& extension = msg->mutableExtension();

  // Receipt-handle
  extension.receipt_handle = system_properties.receipt_handle();

  // Store-timestamp
  if (system_properties.has_store_timestamp()) {
    auto store_timestamp =
        google::protobuf::util::TimeUtil::TimestampToMilliseconds(system_properties.store_timestamp());
    extension.store_time = absl::ToChronoTime(absl::FromUnixMillis(store_timestamp));
  }

  // Store-host
  extension.store_host = system_properties.store_host();

  // Process one-of: delivery-timestamp and delay-level.
  if (system_properties.has_delivery_timestamp()) {
    auto delivery_timestamp_ms =
        google::protobuf::util::TimeUtil::TimestampToMilliseconds(system_properties.delivery_timestamp());
    extension.delivery_timepoint = absl::ToChronoTime(absl::FromUnixMillis(delivery_timestamp_ms));
  }

  // Queue-id
  extension.queue_id = system_properties.queue_id();

  // Queue-offset
  extension.offset = system_properties.queue_offset();

  // Invisible-period
  if (system_properties.has_invisible_duration()) {
    auto invisible_period = std::chrono::seconds(system_properties.invisible_duration().seconds()) +
                            std::chrono::nanoseconds(system_properties.invisible_duration().nanos());
    extension.invisible_period = invisible_period;
  }

  // Delivery attempt
  extension.delivery_attempt = system_properties.delivery_attempt();

  // Decoded Time-Point
  extension.decode_time = std::chrono::system_clock::now();

  return MessageConstSharedPtr(raw);
}