in shims/qpid-proton-cpp/src/qpidit/jms_hdrs_props_test/Receiver.cpp [300:373]
void Receiver::processMessageHeaders(const proton::message& msg) {
addMessageHeaderString("JMS_TYPE_HEADER", msg.subject());
if (_flagMap.isMember("JMS_CORRELATIONID_AS_BYTES") && _flagMap["JMS_CORRELATIONID_AS_BYTES"].asBool()) {
addMessageHeaderByteArray("JMS_CORRELATIONID_HEADER", proton::get<proton::binary>(msg.correlation_id()));
} else {
try {
addMessageHeaderString("JMS_CORRELATIONID_HEADER", proton::get<std::string>(msg.correlation_id()));
} catch (const std::exception& e) {} // TODO: UGLY, how do you check if there _is_ a correlation id?
}
std::string reply_to = msg.reply_to();
// Some brokers prepend 'queue://' and 'topic://' to reply_to addresses, strip these when present
if (_flagMap.isMember("JMS_REPLYTO_AS_TOPIC") && _flagMap["JMS_REPLYTO_AS_TOPIC"].asBool()) {
if (reply_to.find("topic://") == 0) {
addMessageHeaderDestination("JMS_REPLYTO_HEADER", qpidit::JMS_TOPIC, reply_to.substr(8));
} else {
addMessageHeaderDestination("JMS_REPLYTO_HEADER", qpidit::JMS_TOPIC, reply_to);
}
} else {
if (reply_to.find("queue://") == 0) {
addMessageHeaderDestination("JMS_REPLYTO_HEADER", qpidit::JMS_QUEUE, reply_to.substr(8));
} else {
addMessageHeaderDestination("JMS_REPLYTO_HEADER", qpidit::JMS_QUEUE, reply_to);
}
}
if (_flagMap.isMember("JMS_CLIENT_CHECKS") && _flagMap["JMS_CLIENT_CHECKS"].asBool()) {
// Get and check message headers which are set by a JMS-compliant sender
// See: https://docs.oracle.com/cd/E19798-01/821-1841/bnces/index.html
// 1. Destination
std::string destination = msg.to();
stripQueueTopicPrefix(destination); // Some brokers prepend "queue://" or "topic://"
if (destination.compare(_queueName) != 0) {
std::ostringstream oss;
oss << "Invalid header: found \"" << destination << "\"; expected \"" << _queueName << "\"";
throw qpidit::UnexpectedJMSMessageHeader("JMS_DESTINATION", oss.str());
}
// 2. Delivery Mode (persistence)
if (msg.durable()) {
throw qpidit::UnexpectedJMSMessageHeader("JMS_DELIVERY_MODE", "Expected NON_PERSISTENT, found PERSISTENT");
}
// 3. Expiration
const time_t expiryTime = msg.expiry_time().milliseconds();
if (expiryTime != 0) {
std::ostringstream oss;
oss << "Expected expiration time 0, found " << expiryTime << " (" << std::asctime(std::localtime(&expiryTime)) << ")";
throw qpidit::UnexpectedJMSMessageHeader("JMS_EXPIRATION", oss.str());
}
// 4. Message ID
proton::message_id mid = msg.id();
// TODO: Find a check for this
// 5. Message priority
// TODO: PROTON-1505: C++ client does not return the default (4) when the message header is not on
// the wire but a value of 0. Disable this test until fixed.
/*
int msgPriority = msg.priority();
if (msgPriority != 4) { // Default JMS message priority
std::ostringstream oss;
oss << "Expected default priority (4), found priority " << msgPriority;
throw qpidit::UnexpectedJMSMessageHeader("JMS_PRIORITY", oss.str());
}
*/
// 6. Message timestamp
const time_t creationTime = msg.creation_time().milliseconds();
const time_t currentTime = proton::timestamp::now().milliseconds();
if (currentTime - creationTime > 60 * 1000) { // More than 1 minute old
std::ostringstream oss;
oss << "Header contains suspicious value: found " << creationTime << " (";
oss << std::asctime(std::localtime(&creationTime)) << ") is not within 1 minute of now ";
oss << currentTime << " (" << std::asctime(std::localtime(¤tTime)) << ")";
throw qpidit::UnexpectedJMSMessageHeader("JMS_TIMESTAMP", oss.str());
}
}
}