in sdk/core/azure-core-amqp/src/models/amqp_message.cpp [92:311]
std::shared_ptr<AmqpMessage> _detail::AmqpMessageFactory::FromImplementation(
MessageImplementation* message)
{
if (message == nullptr)
{
return nullptr;
}
auto rv{std::make_shared<AmqpMessage>()};
rv->Header = _detail::MessageHeaderFactory::FromImplementation(GetHeaderFromMessage(message));
rv->Properties
= _detail::MessagePropertiesFactory::FromImplementation(GetPropertiesFromMessage(message));
{
AmqpValueImplementation* annotationsVal;
// message_get_delivery_annotations returns a clone of the message annotations.
if (!message_get_delivery_annotations(message, &annotationsVal) && annotationsVal != nullptr)
{
UniqueAmqpValueHandle deliveryAnnotations(annotationsVal);
auto deliveryMap
= Models::_detail::AmqpValueFactory::FromImplementation(deliveryAnnotations)
.AsAnnotations();
rv->DeliveryAnnotations = deliveryMap;
}
}
{
// message_get_message_annotations returns a clone of the message annotations.
AmqpValueImplementation* messageAnnotations{};
if (!message_get_message_annotations(message, &messageAnnotations) && messageAnnotations)
{
rv->MessageAnnotations = Models::_detail::AmqpValueFactory::FromImplementation(
UniqueAmqpValueHandle{messageAnnotations})
.AsAnnotations();
}
}
{
/*
* The ApplicationProperties field in an AMQP message for uAMQP expects that the map value
* is wrapped as a described value. A described value has a ULONG descriptor value and a
* value type.
*
* Making things even more interesting, the ApplicationProperties field in an uAMQP message
* is asymmetric.
*
* The MessageSender class will wrap ApplicationProperties in a described value, so when
* setting application properties, the described value must NOT be present, but when
* decoding an application properties, the GetApplicationProperties method has to be able to
* handle both when the described value is present or not.
*/
AmqpValueImplementation* properties;
if (!message_get_application_properties(message, &properties) && properties)
{
UniqueAmqpValueHandle describedProperties(properties);
properties = nullptr;
if (describedProperties)
{
AmqpValueImplementation* value;
if (amqpvalue_get_type(describedProperties.get()) == AMQP_TYPE_DESCRIBED)
{
#if ENABLE_UAMQP
auto describedType = amqpvalue_get_inplace_descriptor(describedProperties.get());
#else
AmqpValueImplementation* describedType;
if (amqpvalue_get_inplace_descriptor(describedProperties.get(), &describedType))
{
throw std::runtime_error("Could not retrieve application properties described type.");
}
#endif
uint64_t describedTypeValue;
if (amqpvalue_get_ulong(describedType, &describedTypeValue))
{
throw std::runtime_error("Could not retrieve application properties described type.");
}
if (describedTypeValue
!= static_cast<uint64_t>(
Azure::Core::Amqp::_detail::AmqpDescriptors::ApplicationProperties))
{
throw std::runtime_error("Application Properties are not the corect described type.");
}
#if ENABLE_UAMQP
value = amqpvalue_get_inplace_described_value(describedProperties.get());
#elif ENABLE_RUST_AMQP
if (amqpvalue_get_inplace_described_value(describedProperties.get(), &value))
{
throw std::runtime_error(
"Could not retrieve application properties described value.");
}
#endif
}
else
{
value = describedProperties.get();
}
if (amqpvalue_get_type(value) != AMQP_TYPE_MAP)
{
throw std::runtime_error("Application Properties must be a map?!");
}
auto appProperties = AmqpMap(_detail::AmqpValueFactory::FromImplementation(
_detail::UniqueAmqpValueHandle{amqpvalue_clone(value)}));
for (auto const& val : appProperties)
{
if (val.first.GetType() != AmqpValueType::String)
{
throw std::runtime_error("Key of Application Properties must be a string.");
}
rv->ApplicationProperties.emplace(
std::make_pair(static_cast<std::string>(val.first), val.second));
}
}
}
}
#if ENABLE_UAMQP
{
AmqpValueImplementation* deliveryTagVal;
if (!message_get_delivery_tag(message, &deliveryTagVal))
{
UniqueAmqpValueHandle deliveryTag(deliveryTagVal);
rv->DeliveryTag = _detail::AmqpValueFactory::FromImplementation(deliveryTag);
}
}
#endif
{
AmqpValueImplementation* footerVal;
if (!message_get_footer(message, &footerVal) && footerVal)
{
UniqueAmqpValueHandle footerAnnotations(footerVal);
footerVal = nullptr;
auto footerMap
= _detail::AmqpValueFactory::FromImplementation(footerAnnotations).AsAnnotations();
rv->Footer = footerMap;
}
}
{
NativeMessageBodyType bodyType;
if (!message_get_body_type(message, &bodyType))
{
switch (bodyType)
{
case MESSAGE_BODY_TYPE_NONE:
rv->BodyType = MessageBodyType::None;
break;
case MESSAGE_BODY_TYPE_DATA: {
size_t dataCount;
if (!message_get_body_amqp_data_count(message, &dataCount))
{
for (auto i = 0ul; i < dataCount; i += 1)
{
#if ENABLE_UAMQP
BINARY_DATA binaryValue;
if (!message_get_body_amqp_data_in_place(message, i, &binaryValue))
{
rv->m_binaryDataBody.push_back(AmqpBinaryData(std::vector<std::uint8_t>(
binaryValue.bytes, binaryValue.bytes + binaryValue.length)));
}
#elif ENABLE_RUST_AMQP
uint8_t* data;
uint32_t size;
if (!message_get_body_amqp_data_in_place(message, i, &data, &size))
{
rv->m_binaryDataBody.push_back(
AmqpBinaryData(std::vector<std::uint8_t>(data, data + size)));
}
#endif
}
}
rv->BodyType = MessageBodyType::Data;
}
break;
case MESSAGE_BODY_TYPE_SEQUENCE: {
size_t sequenceCount;
if (!message_get_body_amqp_sequence_count(message, &sequenceCount))
{
for (auto i = 0ul; i < sequenceCount; i += 1)
{
AmqpValueImplementation* sequence;
if (!message_get_body_amqp_sequence_in_place(message, i, &sequence))
{
#if ENABLE_UAMQP
rv->m_amqpSequenceBody.push_back(_detail::AmqpValueFactory::FromImplementation(
_detail::UniqueAmqpValueHandle{amqpvalue_clone(sequence)}));
#elif ENABLE_RUST_AMQP
// Rust AMQP cannot return an in-place value - the value returned is already
// cloned.
rv->m_amqpSequenceBody.push_back(_detail::AmqpValueFactory::FromImplementation(
_detail::UniqueAmqpValueHandle{sequence}));
#endif
}
}
}
rv->BodyType = MessageBodyType::Sequence;
}
break;
case MESSAGE_BODY_TYPE_VALUE: {
AmqpValueImplementation* bodyValue;
if (!message_get_body_amqp_value_in_place(message, &bodyValue))
{
#if ENABLE_UAMQP
rv->m_amqpValueBody = _detail::AmqpValueFactory::FromImplementation(
_detail::UniqueAmqpValueHandle{amqpvalue_clone(bodyValue)});
#elif ENABLE_RUST_AMQP
rv->m_amqpValueBody = _detail::AmqpValueFactory::FromImplementation(
_detail::UniqueAmqpValueHandle{bodyValue});
#endif
}
rv->BodyType = MessageBodyType::Value;
}
break;
#if ENABLE_UAMQP
case MESSAGE_BODY_TYPE_INVALID:
throw std::runtime_error("Invalid message body type.");
#endif
default:
throw std::runtime_error("Unknown body type.");
}
}
}
return rv;
}