bool ActiveMQMessageTransformation::transformMessage()

in activemq-cpp/src/main/activemq/util/ActiveMQMessageTransformation.cpp [97:254]


bool ActiveMQMessageTransformation::transformMessage(cms::Message* message, ActiveMQConnection* connection, Message** amqMessage) {

    if (message == NULL) {
        throw NullPointerException(__FILE__, __LINE__, "Provided source cms::Message pointer was NULL");
    }

    if (amqMessage == NULL) {
        throw NullPointerException(__FILE__, __LINE__, "Provided target commands::Message pointer was NULL");
    }

    *amqMessage = dynamic_cast<Message*>(message);

    if (*amqMessage != NULL) {
        return false;
    } else {

        if (dynamic_cast<cms::BytesMessage*>(message) != NULL) {
            cms::BytesMessage* bytesMsg = dynamic_cast<cms::BytesMessage*>(message);
            bytesMsg->reset();
            ActiveMQBytesMessage* msg = new ActiveMQBytesMessage();
            msg->setConnection(connection);
            try {
                for (;;) {
                    // Reads a byte from the message stream until the stream is empty
                    msg->writeByte(bytesMsg->readByte());
                }
            } catch (cms::MessageEOFException& e) {
                // if an end of message stream as expected
            } catch (cms::CMSException& e) {
            }

            *amqMessage = msg;
        } else if (dynamic_cast<cms::MapMessage*>(message) != NULL) {
            cms::MapMessage* mapMsg = dynamic_cast<cms::MapMessage*>(message);
            ActiveMQMapMessage* msg = new ActiveMQMapMessage();
            msg->setConnection(connection);

            std::vector<std::string> elements = mapMsg->getMapNames();
            std::vector<std::string>::iterator iter = elements.begin();
            for(; iter != elements.end() ; ++iter) {
                std::string key = *iter;
                cms::Message::ValueType elementType = mapMsg->getValueType(key);

                switch(elementType) {
                    case cms::Message::BOOLEAN_TYPE:
                        msg->setBoolean(key, mapMsg->getBoolean(key));
                        break;
                    case cms::Message::BYTE_TYPE:
                        msg->setByte(key, mapMsg->getByte(key));
                        break;
                    case cms::Message::BYTE_ARRAY_TYPE:
                        msg->setBytes(key, mapMsg->getBytes(key));
                        break;
                    case cms::Message::CHAR_TYPE:
                        msg->setChar(key, mapMsg->getChar(key));
                        break;
                    case cms::Message::SHORT_TYPE:
                        msg->setShort(key, mapMsg->getShort(key));
                        break;
                    case cms::Message::INTEGER_TYPE:
                        msg->setInt(key, mapMsg->getInt(key));
                        break;
                    case cms::Message::LONG_TYPE:
                        msg->setLong(key, mapMsg->getLong(key));
                        break;
                    case cms::Message::FLOAT_TYPE:
                        msg->setFloat(key, mapMsg->getFloat(key));
                        break;
                    case cms::Message::DOUBLE_TYPE:
                        msg->setDouble(key, mapMsg->getDouble(key));
                        break;
                    case cms::Message::STRING_TYPE:
                        msg->setString(key, mapMsg->getString(key));
                        break;
                    default:
                        break;
                }
            }

            *amqMessage = msg;
        } else if (dynamic_cast<cms::ObjectMessage*>(message) != NULL) {
            cms::ObjectMessage* objMsg = dynamic_cast<cms::ObjectMessage*>(message);
            ActiveMQObjectMessage* msg = new ActiveMQObjectMessage();
            msg->setConnection(connection);
            msg->setObjectBytes(objMsg->getObjectBytes());
            *amqMessage = msg;
        } else if (dynamic_cast<cms::StreamMessage*>(message) != NULL) {
            cms::StreamMessage* streamMessage = dynamic_cast<cms::StreamMessage*>(message);
            streamMessage->reset();
            ActiveMQStreamMessage* msg = new ActiveMQStreamMessage();
            msg->setConnection(connection);

            try {
                while(true) {
                    cms::Message::ValueType elementType = streamMessage->getNextValueType();
                    int result = -1;
                    std::vector<unsigned char> buffer(255);

                    switch(elementType) {
                        case cms::Message::BOOLEAN_TYPE:
                            msg->writeBoolean(streamMessage->readBoolean());
                            break;
                        case cms::Message::BYTE_TYPE:
                            msg->writeBoolean(streamMessage->readBoolean());
                            break;
                        case cms::Message::BYTE_ARRAY_TYPE:
                            while ((result = streamMessage->readBytes(buffer)) != -1) {
                                msg->writeBytes(&buffer[0], 0, result);
                                buffer.clear();
                            }
                            break;
                        case cms::Message::CHAR_TYPE:
                            msg->writeChar(streamMessage->readChar());
                            break;
                        case cms::Message::SHORT_TYPE:
                            msg->writeShort(streamMessage->readShort());
                            break;
                        case cms::Message::INTEGER_TYPE:
                            msg->writeInt(streamMessage->readInt());
                            break;
                        case cms::Message::LONG_TYPE:
                            msg->writeLong(streamMessage->readLong());
                            break;
                        case cms::Message::FLOAT_TYPE:
                            msg->writeFloat(streamMessage->readFloat());
                            break;
                        case cms::Message::DOUBLE_TYPE:
                            msg->writeDouble(streamMessage->readDouble());
                            break;
                        case cms::Message::STRING_TYPE:
                            msg->writeString(streamMessage->readString());
                            break;
                        default:
                            break;
                    }
                }
            } catch (cms::MessageEOFException& e) {
                // if an end of message stream as expected
            } catch (cms::CMSException& e) {
            }

            *amqMessage = msg;
        } else if (dynamic_cast<cms::TextMessage*>(message) != NULL) {
            cms::TextMessage* textMsg = dynamic_cast<cms::TextMessage*>(message);
            ActiveMQTextMessage* msg = new ActiveMQTextMessage();
            msg->setConnection(connection);
            msg->setText(textMsg->getText());
            *amqMessage = msg;
        } else {
            *amqMessage = new ActiveMQMessage();
            (*amqMessage)->setConnection(connection);
        }

        ActiveMQMessageTransformation::copyProperties(message, dynamic_cast<cms::Message*>(*amqMessage));
    }

    return true;
}