boost::intrusive_ptr Translation::getTransfer()

in src/qpid/broker/amqp/Translation.cpp [158:279]


boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation::getTransfer()
{
    boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> t =
        boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer>(dynamic_cast<const qpid::broker::amqp_0_10::MessageTransfer*>(&original.getEncoding()));
    if (t) {
        return t;//no translation required
    } else {
        const Message* message = dynamic_cast<const Message*>(&original.getEncoding());
        if (message) {
            //translate 1.0 message into 0-10
            boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer());
            qpid::framing::AMQFrame method((qpid::framing::MessageTransferBody(qpid::framing::ProtocolVersion(), EMPTY, 0, 0)));
            qpid::framing::AMQFrame header((qpid::framing::AMQHeaderBody()));
            qpid::framing::AMQFrame content((qpid::framing::AMQContentBody()));
            method.setEof(false);
            header.setBof(false);
            header.setEof(false);
            content.setBof(false);

            transfer->getFrames().append(method);
            transfer->getFrames().append(header);

            qpid::framing::MessageProperties* props =
                transfer->getFrames().getHeaders()->get<qpid::framing::MessageProperties>(true);

            if (message->isTypedBody()) {
                qpid::types::Variant body = message->getTypedBody();
                std::string& data = content.castBody<qpid::framing::AMQContentBody>()->getData();
                if (body.getType() == qpid::types::VAR_MAP) {
                    qpid::amqp_0_10::MapCodec::encode(body.asMap(), data);
                    props->setContentType(qpid::amqp_0_10::MapCodec::contentType);
                } else if (body.getType() == qpid::types::VAR_LIST) {
                    qpid::amqp_0_10::ListCodec::encode(body.asList(), data);
                    props->setContentType(qpid::amqp_0_10::ListCodec::contentType);
                } else if (body.getType() == qpid::types::VAR_STRING) {
                    data = body.getString();
                    if (body.getEncoding() == qpid::types::encodings::UTF8 || body.getEncoding() == qpid::types::encodings::ASCII) {
                        props->setContentType(TEXT_PLAIN);
                    }
                } else {
                    qpid::types::Variant::List container;
                    container.push_back(body);
                    qpid::amqp_0_10::ListCodec::encode(container, data);
                    props->setContentType(qpid::amqp_0_10::ListCodec::contentType);
                }
                transfer->getFrames().append(content);
                props->setContentLength(data.size());
            } else {
                qpid::amqp::CharSequence body = message->getBody();
                content.castBody<qpid::framing::AMQContentBody>()->getData().assign(body.data, body.size);
                transfer->getFrames().append(content);

                props->setContentLength(body.size);
            }

            qpid::amqp::MessageId mid = message->getMessageId();
            qpid::framing::Uuid uuid;
            switch (mid.type) {
              case qpid::amqp::MessageId::NONE:
                break;
              case qpid::amqp::MessageId::UUID:
              case qpid::amqp::MessageId::BYTES:
                if (mid.value.bytes.size == 0) break;
                if (setMessageId(*props, mid.value.bytes)) break;
                // Fallthru
              case qpid::amqp::MessageId::ULONG:
                QPID_LOG(info, "Skipping message id in translation from 1.0 to 0-10 as it is not a UUID");
                break;
            }

            qpid::amqp::MessageId cid = message->getCorrelationId();
            switch (cid.type) {
              case qpid::amqp::MessageId::NONE:
                break;
              case qpid::amqp::MessageId::UUID:
                assert(cid.value.bytes.size == 16);
                props->setCorrelationId(qpid::framing::Uuid(cid.value.bytes.data).str());
                break;
              case qpid::amqp::MessageId::BYTES:
                if (cid.value.bytes.size) {
                    props->setCorrelationId(translate(cid.value.bytes));
                }
                break;
              case qpid::amqp::MessageId::ULONG:
                props->setCorrelationId(boost::lexical_cast<std::string>(cid.value.ulong));
                break;
            }
            if (message->getReplyToAsCharSequence()) props->setReplyTo(translate(message->getReplyTo(), broker));
            if (message->getContentType()) props->setContentType(translate(message->getContentType()));
            if (message->getContentEncoding()) props->setContentEncoding(translate(message->getContentEncoding()));
            props->setUserId(message->getUserId());
            // TODO: FieldTable applicationHeaders;
            qpid::amqp::CharSequence ap = message->getApplicationProperties();
            if (ap) {
                qpid::amqp::Decoder d(ap.data, ap.size);
                qpid::amqp_0_10::translate(d.readMap(), props->getApplicationHeaders());
                std::string appid = props->getApplicationHeaders().getAsString(APP_ID);
                if (!appid.empty()) {
                    props->setAppId(appid);
                }
            }

            qpid::framing::DeliveryProperties* dp =
                transfer->getFrames().getHeaders()->get<qpid::framing::DeliveryProperties>(true);
            dp->setPriority(message->getPriority());
            if (message->isPersistent()) dp->setDeliveryMode(2);
            if (message->getRoutingKey().size()) {
                if (message->getRoutingKey().size() > std::numeric_limits<uint8_t>::max()) {
                    //have to truncate routing key as it is specified to be a str8
                    dp->setRoutingKey(message->getRoutingKey().substr(0,std::numeric_limits<uint8_t>::max()));
                } else {
                    dp->setRoutingKey(message->getRoutingKey());
                }
                props->getApplicationHeaders().setString(SUBJECT_KEY, message->getRoutingKey());
            }

            return transfer.get();
        } else {
            throw qpid::Exception("Could not write message data in AMQP 0-10 format");
        }
    }
}