in activemq-cpp/src/main/activemq/util/ActiveMQMessageTransformation.cpp [97:254]
bool ActiveMQMessageTransformation::transformMessage(cms::Message* message, ActiveMQConnection* connection, Message** amqMessage) {
if (message == NULL) {
throw NullPointerException(__FILE__, __LINE__, "Provided source cms::Message pointer was NULL");
}
if (amqMessage == NULL) {
throw NullPointerException(__FILE__, __LINE__, "Provided target commands::Message pointer was NULL");
}
*amqMessage = dynamic_cast<Message*>(message);
if (*amqMessage != NULL) {
return false;
} else {
if (dynamic_cast<cms::BytesMessage*>(message) != NULL) {
cms::BytesMessage* bytesMsg = dynamic_cast<cms::BytesMessage*>(message);
bytesMsg->reset();
ActiveMQBytesMessage* msg = new ActiveMQBytesMessage();
msg->setConnection(connection);
try {
for (;;) {
// Reads a byte from the message stream until the stream is empty
msg->writeByte(bytesMsg->readByte());
}
} catch (cms::MessageEOFException& e) {
// if an end of message stream as expected
} catch (cms::CMSException& e) {
}
*amqMessage = msg;
} else if (dynamic_cast<cms::MapMessage*>(message) != NULL) {
cms::MapMessage* mapMsg = dynamic_cast<cms::MapMessage*>(message);
ActiveMQMapMessage* msg = new ActiveMQMapMessage();
msg->setConnection(connection);
std::vector<std::string> elements = mapMsg->getMapNames();
std::vector<std::string>::iterator iter = elements.begin();
for(; iter != elements.end() ; ++iter) {
std::string key = *iter;
cms::Message::ValueType elementType = mapMsg->getValueType(key);
switch(elementType) {
case cms::Message::BOOLEAN_TYPE:
msg->setBoolean(key, mapMsg->getBoolean(key));
break;
case cms::Message::BYTE_TYPE:
msg->setByte(key, mapMsg->getByte(key));
break;
case cms::Message::BYTE_ARRAY_TYPE:
msg->setBytes(key, mapMsg->getBytes(key));
break;
case cms::Message::CHAR_TYPE:
msg->setChar(key, mapMsg->getChar(key));
break;
case cms::Message::SHORT_TYPE:
msg->setShort(key, mapMsg->getShort(key));
break;
case cms::Message::INTEGER_TYPE:
msg->setInt(key, mapMsg->getInt(key));
break;
case cms::Message::LONG_TYPE:
msg->setLong(key, mapMsg->getLong(key));
break;
case cms::Message::FLOAT_TYPE:
msg->setFloat(key, mapMsg->getFloat(key));
break;
case cms::Message::DOUBLE_TYPE:
msg->setDouble(key, mapMsg->getDouble(key));
break;
case cms::Message::STRING_TYPE:
msg->setString(key, mapMsg->getString(key));
break;
default:
break;
}
}
*amqMessage = msg;
} else if (dynamic_cast<cms::ObjectMessage*>(message) != NULL) {
cms::ObjectMessage* objMsg = dynamic_cast<cms::ObjectMessage*>(message);
ActiveMQObjectMessage* msg = new ActiveMQObjectMessage();
msg->setConnection(connection);
msg->setObjectBytes(objMsg->getObjectBytes());
*amqMessage = msg;
} else if (dynamic_cast<cms::StreamMessage*>(message) != NULL) {
cms::StreamMessage* streamMessage = dynamic_cast<cms::StreamMessage*>(message);
streamMessage->reset();
ActiveMQStreamMessage* msg = new ActiveMQStreamMessage();
msg->setConnection(connection);
try {
while(true) {
cms::Message::ValueType elementType = streamMessage->getNextValueType();
int result = -1;
std::vector<unsigned char> buffer(255);
switch(elementType) {
case cms::Message::BOOLEAN_TYPE:
msg->writeBoolean(streamMessage->readBoolean());
break;
case cms::Message::BYTE_TYPE:
msg->writeBoolean(streamMessage->readBoolean());
break;
case cms::Message::BYTE_ARRAY_TYPE:
while ((result = streamMessage->readBytes(buffer)) != -1) {
msg->writeBytes(&buffer[0], 0, result);
buffer.clear();
}
break;
case cms::Message::CHAR_TYPE:
msg->writeChar(streamMessage->readChar());
break;
case cms::Message::SHORT_TYPE:
msg->writeShort(streamMessage->readShort());
break;
case cms::Message::INTEGER_TYPE:
msg->writeInt(streamMessage->readInt());
break;
case cms::Message::LONG_TYPE:
msg->writeLong(streamMessage->readLong());
break;
case cms::Message::FLOAT_TYPE:
msg->writeFloat(streamMessage->readFloat());
break;
case cms::Message::DOUBLE_TYPE:
msg->writeDouble(streamMessage->readDouble());
break;
case cms::Message::STRING_TYPE:
msg->writeString(streamMessage->readString());
break;
default:
break;
}
}
} catch (cms::MessageEOFException& e) {
// if an end of message stream as expected
} catch (cms::CMSException& e) {
}
*amqMessage = msg;
} else if (dynamic_cast<cms::TextMessage*>(message) != NULL) {
cms::TextMessage* textMsg = dynamic_cast<cms::TextMessage*>(message);
ActiveMQTextMessage* msg = new ActiveMQTextMessage();
msg->setConnection(connection);
msg->setText(textMsg->getText());
*amqMessage = msg;
} else {
*amqMessage = new ActiveMQMessage();
(*amqMessage)->setConnection(connection);
}
ActiveMQMessageTransformation::copyProperties(message, dynamic_cast<cms::Message*>(*amqMessage));
}
return true;
}