in src/main/csharp/MessageConsumer.cs [289:459]
protected virtual IMessage ToNmsMessage(byte[] msgData)
{
BaseMessage nmsMessage = null;
int messageType = WireFormat.MT_UNKNOWN;
int fieldType = WireFormat.MFT_NONE;
DateTime messageTimestamp = DateTime.UtcNow;
string messageNMSType = null;
string messageCorrelationId = null;
IDestination messageReplyTo = null;
MsgDeliveryMode messageDeliveryMode = MsgDeliveryMode.NonPersistent;
MsgPriority messagePriority = MsgPriority.Normal;
TimeSpan messageTimeToLive = TimeSpan.FromTicks(0);
byte[] messageProperties = null;
int fieldLen;
int index = 0;
string messageID = string.Empty;
byte[] messageBody = null;
try
{
// Parse the commond message fields
do
{
fieldType = ReadInt(msgData, ref index);
switch(fieldType)
{
case WireFormat.MFT_NONE:
break;
case WireFormat.MFT_MESSAGEID:
messageID = ReadString(msgData, ref index);
break;
case WireFormat.MFT_TIMESTAMP:
fieldLen = ReadInt(msgData, ref index);
Debug.Assert(sizeof(long) == fieldLen);
messageTimestamp = DateTime.FromBinary(ReadLong(msgData, ref index));
break;
case WireFormat.MFT_NMSTYPE:
messageNMSType = ReadString(msgData, ref index);
break;
case WireFormat.MFT_CORRELATIONID:
messageCorrelationId = ReadString(msgData, ref index);
break;
case WireFormat.MFT_REPLYTO:
string replyToDestName = ReadString(msgData, ref index);
messageReplyTo = this.session.GetDestination(replyToDestName);
break;
case WireFormat.MFT_DELIVERYMODE:
fieldLen = ReadInt(msgData, ref index);
Debug.Assert(sizeof(int) == fieldLen);
messageDeliveryMode = (MsgDeliveryMode) ReadInt(msgData, ref index);
break;
case WireFormat.MFT_PRIORITY:
fieldLen = ReadInt(msgData, ref index);
Debug.Assert(sizeof(int) == fieldLen);
messagePriority = (MsgPriority) ReadInt(msgData, ref index);
break;
case WireFormat.MFT_TIMETOLIVE:
fieldLen = ReadInt(msgData, ref index);
Debug.Assert(sizeof(long) == fieldLen);
messageTimeToLive = TimeSpan.FromTicks(ReadLong(msgData, ref index));
break;
case WireFormat.MFT_HEADERS:
fieldLen = ReadInt(msgData, ref index);
messageProperties = new byte[fieldLen];
for(int propIndex = 0; propIndex < fieldLen; propIndex++, index++)
{
messageProperties[propIndex] = msgData[index];
}
break;
case WireFormat.MFT_MSGTYPE:
fieldLen = ReadInt(msgData, ref index);
Debug.Assert(sizeof(int) == fieldLen);
messageType = ReadInt(msgData, ref index);
break;
case WireFormat.MFT_BODY:
messageBody = ReadBytes(msgData, ref index);
break;
default:
// Skip past this field.
Tracer.WarnFormat("Unknown message field type: {0}", fieldType);
fieldLen = ReadInt(msgData, ref index);
index += fieldLen;
break;
}
} while(WireFormat.MFT_NONE != fieldType && index < msgData.Length);
}
catch(Exception ex)
{
Tracer.ErrorFormat("Exception parsing message: {0}", ex.Message);
}
// Instantiate the message type
switch(messageType)
{
case WireFormat.MT_MESSAGE:
nmsMessage = new BaseMessage();
break;
case WireFormat.MT_TEXTMESSAGE:
nmsMessage = new TextMessage();
if(null != messageBody)
{
((TextMessage) nmsMessage).Text = Encoding.UTF8.GetString(messageBody);
}
break;
case WireFormat.MT_BYTESMESSAGE:
nmsMessage = new BytesMessage();
if(null != messageBody)
{
((BytesMessage) nmsMessage).Content = messageBody;
}
break;
case WireFormat.MT_UNKNOWN:
default:
break;
}
// Set the common headers.
if(null != nmsMessage)
{
try
{
nmsMessage.NMSMessageId = messageID;
nmsMessage.NMSCorrelationID = messageCorrelationId;
nmsMessage.NMSDestination = this.destination;
nmsMessage.NMSReplyTo = messageReplyTo;
nmsMessage.NMSDeliveryMode = messageDeliveryMode;
nmsMessage.NMSPriority = messagePriority;
nmsMessage.NMSTimestamp = messageTimestamp;
nmsMessage.NMSTimeToLive = messageTimeToLive;
nmsMessage.NMSType = messageNMSType;
if(null != messageProperties)
{
nmsMessage.PropertiesMap = PrimitiveMap.Unmarshal(messageProperties);
}
}
catch(InvalidOperationException)
{
// Log error
}
if(null != this.ConsumerTransformer)
{
IMessage transformedMessage = ConsumerTransformer(this.session, this, nmsMessage);
if(null != transformedMessage)
{
nmsMessage = transformedMessage as BaseMessage;
}
}
nmsMessage.ReadOnlyBody = true;
nmsMessage.ReadOnlyProperties = true;
}
return nmsMessage;
}