in src/qpid/broker/amqp/Translation.cpp [158:279]
boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation::getTransfer()
{
boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> t =
boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer>(dynamic_cast<const qpid::broker::amqp_0_10::MessageTransfer*>(&original.getEncoding()));
if (t) {
return t;//no translation required
} else {
const Message* message = dynamic_cast<const Message*>(&original.getEncoding());
if (message) {
//translate 1.0 message into 0-10
boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer());
qpid::framing::AMQFrame method((qpid::framing::MessageTransferBody(qpid::framing::ProtocolVersion(), EMPTY, 0, 0)));
qpid::framing::AMQFrame header((qpid::framing::AMQHeaderBody()));
qpid::framing::AMQFrame content((qpid::framing::AMQContentBody()));
method.setEof(false);
header.setBof(false);
header.setEof(false);
content.setBof(false);
transfer->getFrames().append(method);
transfer->getFrames().append(header);
qpid::framing::MessageProperties* props =
transfer->getFrames().getHeaders()->get<qpid::framing::MessageProperties>(true);
if (message->isTypedBody()) {
qpid::types::Variant body = message->getTypedBody();
std::string& data = content.castBody<qpid::framing::AMQContentBody>()->getData();
if (body.getType() == qpid::types::VAR_MAP) {
qpid::amqp_0_10::MapCodec::encode(body.asMap(), data);
props->setContentType(qpid::amqp_0_10::MapCodec::contentType);
} else if (body.getType() == qpid::types::VAR_LIST) {
qpid::amqp_0_10::ListCodec::encode(body.asList(), data);
props->setContentType(qpid::amqp_0_10::ListCodec::contentType);
} else if (body.getType() == qpid::types::VAR_STRING) {
data = body.getString();
if (body.getEncoding() == qpid::types::encodings::UTF8 || body.getEncoding() == qpid::types::encodings::ASCII) {
props->setContentType(TEXT_PLAIN);
}
} else {
qpid::types::Variant::List container;
container.push_back(body);
qpid::amqp_0_10::ListCodec::encode(container, data);
props->setContentType(qpid::amqp_0_10::ListCodec::contentType);
}
transfer->getFrames().append(content);
props->setContentLength(data.size());
} else {
qpid::amqp::CharSequence body = message->getBody();
content.castBody<qpid::framing::AMQContentBody>()->getData().assign(body.data, body.size);
transfer->getFrames().append(content);
props->setContentLength(body.size);
}
qpid::amqp::MessageId mid = message->getMessageId();
qpid::framing::Uuid uuid;
switch (mid.type) {
case qpid::amqp::MessageId::NONE:
break;
case qpid::amqp::MessageId::UUID:
case qpid::amqp::MessageId::BYTES:
if (mid.value.bytes.size == 0) break;
if (setMessageId(*props, mid.value.bytes)) break;
// Fallthru
case qpid::amqp::MessageId::ULONG:
QPID_LOG(info, "Skipping message id in translation from 1.0 to 0-10 as it is not a UUID");
break;
}
qpid::amqp::MessageId cid = message->getCorrelationId();
switch (cid.type) {
case qpid::amqp::MessageId::NONE:
break;
case qpid::amqp::MessageId::UUID:
assert(cid.value.bytes.size == 16);
props->setCorrelationId(qpid::framing::Uuid(cid.value.bytes.data).str());
break;
case qpid::amqp::MessageId::BYTES:
if (cid.value.bytes.size) {
props->setCorrelationId(translate(cid.value.bytes));
}
break;
case qpid::amqp::MessageId::ULONG:
props->setCorrelationId(boost::lexical_cast<std::string>(cid.value.ulong));
break;
}
if (message->getReplyToAsCharSequence()) props->setReplyTo(translate(message->getReplyTo(), broker));
if (message->getContentType()) props->setContentType(translate(message->getContentType()));
if (message->getContentEncoding()) props->setContentEncoding(translate(message->getContentEncoding()));
props->setUserId(message->getUserId());
// TODO: FieldTable applicationHeaders;
qpid::amqp::CharSequence ap = message->getApplicationProperties();
if (ap) {
qpid::amqp::Decoder d(ap.data, ap.size);
qpid::amqp_0_10::translate(d.readMap(), props->getApplicationHeaders());
std::string appid = props->getApplicationHeaders().getAsString(APP_ID);
if (!appid.empty()) {
props->setAppId(appid);
}
}
qpid::framing::DeliveryProperties* dp =
transfer->getFrames().getHeaders()->get<qpid::framing::DeliveryProperties>(true);
dp->setPriority(message->getPriority());
if (message->isPersistent()) dp->setDeliveryMode(2);
if (message->getRoutingKey().size()) {
if (message->getRoutingKey().size() > std::numeric_limits<uint8_t>::max()) {
//have to truncate routing key as it is specified to be a str8
dp->setRoutingKey(message->getRoutingKey().substr(0,std::numeric_limits<uint8_t>::max()));
} else {
dp->setRoutingKey(message->getRoutingKey());
}
props->getApplicationHeaders().setString(SUBJECT_KEY, message->getRoutingKey());
}
return transfer.get();
} else {
throw qpid::Exception("Could not write message data in AMQP 0-10 format");
}
}
}