in sdk/core/azure-core-amqp/src/models/amqp_message.cpp [738:913]
void OnAmqpMessageFieldDecoded(AmqpValue value)
{
if (value.GetType() != AmqpValueType::Described)
{
throw std::runtime_error("Decoded message field whose type is NOT described.");
}
AmqpDescribed describedType(value.AsDescribed());
if (describedType.GetDescriptor().GetType() != AmqpValueType::Ulong)
{
throw std::runtime_error("Decoded message field MUST be a LONG type.");
}
AmqpDescriptors fieldDescriptor(
static_cast<AmqpDescriptors>(static_cast<uint64_t>(describedType.GetDescriptor())));
if (m_expectedMessageFields.find(fieldDescriptor) == m_expectedMessageFields.end())
{
throw std::runtime_error("Found message field is not in the set of expected fields.");
}
else
{
// Once we've seen a field, we can remove that field from the set of expected fields,
// and also the fields which must come before it.
//
// The two exceptions are the DataBinary and DataAmqpSequence fields, which can have
// more than one instance.
switch (fieldDescriptor)
{
case AmqpDescriptors::Header:
m_expectedMessageFields.erase(AmqpDescriptors::Header);
break;
case AmqpDescriptors::DeliveryAnnotations:
// Once we've seen a DeliveryAnnotations, we no longer expect to see a Header or
// another DeliveryAnnotations.
m_expectedMessageFields.erase(AmqpDescriptors::Header);
m_expectedMessageFields.erase(AmqpDescriptors::DeliveryAnnotations);
break;
case AmqpDescriptors::MessageAnnotations:
// Once we've seen a MessageAnnotations, we no longer expect to see a Header,
// DeliveryAnnotations, or a MessageAnnotations.
m_expectedMessageFields.erase(AmqpDescriptors::Header);
m_expectedMessageFields.erase(AmqpDescriptors::DeliveryAnnotations);
m_expectedMessageFields.erase(AmqpDescriptors::MessageAnnotations);
break;
case AmqpDescriptors::Properties:
m_expectedMessageFields.erase(AmqpDescriptors::Header);
m_expectedMessageFields.erase(AmqpDescriptors::DeliveryAnnotations);
m_expectedMessageFields.erase(AmqpDescriptors::MessageAnnotations);
m_expectedMessageFields.erase(AmqpDescriptors::Properties);
break;
case AmqpDescriptors::ApplicationProperties:
m_expectedMessageFields.erase(AmqpDescriptors::Header);
m_expectedMessageFields.erase(AmqpDescriptors::DeliveryAnnotations);
m_expectedMessageFields.erase(AmqpDescriptors::MessageAnnotations);
m_expectedMessageFields.erase(AmqpDescriptors::Properties);
m_expectedMessageFields.erase(AmqpDescriptors::ApplicationProperties);
break;
case AmqpDescriptors::DataAmqpSequence:
m_expectedMessageFields.erase(AmqpDescriptors::Header);
m_expectedMessageFields.erase(AmqpDescriptors::DeliveryAnnotations);
m_expectedMessageFields.erase(AmqpDescriptors::MessageAnnotations);
m_expectedMessageFields.erase(AmqpDescriptors::Properties);
m_expectedMessageFields.erase(AmqpDescriptors::ApplicationProperties);
// When we see an DataAmqpSequence, we no longer expect to see any other data type.
m_expectedMessageFields.erase(AmqpDescriptors::DataAmqpValue);
m_expectedMessageFields.erase(AmqpDescriptors::DataBinary);
break;
case AmqpDescriptors::DataAmqpValue:
m_expectedMessageFields.erase(AmqpDescriptors::Header);
m_expectedMessageFields.erase(AmqpDescriptors::DeliveryAnnotations);
m_expectedMessageFields.erase(AmqpDescriptors::MessageAnnotations);
m_expectedMessageFields.erase(AmqpDescriptors::Properties);
m_expectedMessageFields.erase(AmqpDescriptors::ApplicationProperties);
// When we see an DataAmqpValue, we no longer expect to see any other data type.
m_expectedMessageFields.erase(AmqpDescriptors::DataAmqpValue);
m_expectedMessageFields.erase(AmqpDescriptors::DataAmqpSequence);
m_expectedMessageFields.erase(AmqpDescriptors::DataBinary);
break;
case AmqpDescriptors::DataBinary:
m_expectedMessageFields.erase(AmqpDescriptors::Header);
m_expectedMessageFields.erase(AmqpDescriptors::DeliveryAnnotations);
m_expectedMessageFields.erase(AmqpDescriptors::MessageAnnotations);
m_expectedMessageFields.erase(AmqpDescriptors::Properties);
m_expectedMessageFields.erase(AmqpDescriptors::ApplicationProperties);
// When we see an DataBinary, we no longer expect to see any other data type.
m_expectedMessageFields.erase(AmqpDescriptors::DataAmqpValue);
m_expectedMessageFields.erase(AmqpDescriptors::DataAmqpSequence);
break;
case AmqpDescriptors::Footer:
m_expectedMessageFields.erase(AmqpDescriptors::Header);
m_expectedMessageFields.erase(AmqpDescriptors::DeliveryAnnotations);
m_expectedMessageFields.erase(AmqpDescriptors::MessageAnnotations);
m_expectedMessageFields.erase(AmqpDescriptors::Properties);
m_expectedMessageFields.erase(AmqpDescriptors::ApplicationProperties);
// When we see an DataBinary, we no longer expect to see any other data type.
m_expectedMessageFields.erase(AmqpDescriptors::DataBinary);
m_expectedMessageFields.erase(AmqpDescriptors::DataAmqpValue);
m_expectedMessageFields.erase(AmqpDescriptors::DataAmqpSequence);
m_expectedMessageFields.erase(AmqpDescriptors::Footer);
break;
default:
throw std::runtime_error("Unknown message descriptor.");
}
}
switch (fieldDescriptor)
{
case AmqpDescriptors::Header: {
UniqueMessageHeaderHandle messageHeader;
HEADER_HANDLE h;
if (amqpvalue_get_header(_detail::AmqpValueFactory::ToImplementation(value), &h))
{
throw std::runtime_error("Could not convert field to header.");
}
messageHeader.reset(h);
h = nullptr;
m_decodedValue.Header
= _detail::MessageHeaderFactory::FromImplementation(messageHeader);
break;
}
case AmqpDescriptors::DeliveryAnnotations:
m_decodedValue.DeliveryAnnotations = describedType.GetValue().AsAnnotations();
break;
case AmqpDescriptors::MessageAnnotations:
m_decodedValue.MessageAnnotations = describedType.GetValue().AsAnnotations();
break;
case AmqpDescriptors::Properties: {
UniquePropertiesHandle properties;
PROPERTIES_HANDLE h;
if (amqpvalue_get_properties(_detail::AmqpValueFactory::ToImplementation(value), &h))
{
throw std::runtime_error("Could not convert field to header.");
}
properties.reset(h);
h = nullptr;
m_decodedValue.Properties
= _detail::MessagePropertiesFactory::FromImplementation(properties);
break;
}
case AmqpDescriptors::ApplicationProperties: {
auto propertyMap = describedType.GetValue().AsMap();
for (auto const& val : propertyMap)
{
if (val.first.GetType() != AmqpValueType::String)
{
throw std::runtime_error("Key of applications properties must be a string.");
}
if ((val.second.GetType() == AmqpValueType::List)
|| (val.second.GetType() == AmqpValueType::Map)
|| (val.second.GetType() == AmqpValueType::Composite)
|| (val.second.GetType() == AmqpValueType::Described))
{
throw std::runtime_error(
"Message Application Property values must be simple value types");
}
m_decodedValue.ApplicationProperties.emplace(
static_cast<std::string>(val.first), val.second);
}
break;
}
case AmqpDescriptors::DataAmqpValue:
m_decodedValue.SetBody(describedType.GetValue());
break;
case AmqpDescriptors::DataAmqpSequence:
m_decodedValue.SetBody(describedType.GetValue().AsList());
break;
case AmqpDescriptors::DataBinary:
// Each call to SetBody will append the binary value to the vector of binary bodies.
m_decodedValue.SetBody(describedType.GetValue().AsBinary());
break;
case AmqpDescriptors::Footer:
m_decodedValue.Footer = describedType.GetValue().AsAnnotations();
break;
default:
throw std::runtime_error("Unknown message descriptor.");
}
}