in packetbeat/protos/amqp/amqp_parser.go [192:328]
func getMessageProperties(s *amqpStream, data []byte) bool {
m := s.message
// properties are coded in the two first bytes
prop1 := data[0]
prop2 := data[1]
var offset uint32 = 2
// while last bit set, we have another property flag field
for lastbit := 1; data[lastbit]&1 == 1; {
lastbit += 2
offset += 2
}
if hasProperty(prop1, contentTypeProp) {
contentType, next, err := getShortString(data, offset+1, uint32(data[offset]))
if err {
logp.Warn("Failed to get content type in header frame")
return true
}
m.fields["content-type"] = contentType
offset = next
}
if hasProperty(prop1, contentEncodingProp) {
contentEncoding, next, err := getShortString(data, offset+1, uint32(data[offset]))
if err {
logp.Warn("Failed to get content encoding in header frame")
return true
}
m.fields["content-encoding"] = contentEncoding
offset = next
}
if hasProperty(prop1, headersProp) {
headers := mapstr.M{}
next, err, exists := getTable(headers, data, offset)
if !err && exists {
m.fields["headers"] = headers
} else if err {
logp.Warn("Failed to get headers")
return true
}
offset = next
}
if hasProperty(prop1, deliveryModeProp) {
switch data[offset] {
case 1:
m.fields["delivery-mode"] = "non-persistent"
case 2:
m.fields["delivery-mode"] = "persistent"
}
offset++
}
if hasProperty(prop1, priorityProp) {
m.fields["priority"] = data[offset]
offset++
}
if hasProperty(prop1, correlationIDProp) {
correlationID, next, err := getShortString(data, offset+1, uint32(data[offset]))
if err {
logp.Warn("Failed to get correlation-id in header frame")
return true
}
m.fields["correlation-id"] = correlationID
offset = next
}
if hasProperty(prop1, replyToProp) {
replyTo, next, err := getShortString(data, offset+1, uint32(data[offset]))
if err {
logp.Warn("Failed to get reply-to in header frame")
return true
}
m.fields["reply-to"] = replyTo
offset = next
}
if hasProperty(prop1, expirationProp) {
expiration, next, err := getShortString(data, offset+1, uint32(data[offset]))
if err {
logp.Warn("Failed to get expiration in header frame")
return true
}
m.fields["expiration"] = expiration
offset = next
}
if hasProperty(prop2, messageIDProp) {
messageID, next, err := getShortString(data, offset+1, uint32(data[offset]))
if err {
logp.Warn("Failed to get message id in header frame")
return true
}
m.fields["message-id"] = messageID
offset = next
}
if hasProperty(prop2, timestampProp) {
t := time.Unix(int64(binary.BigEndian.Uint64(data[offset:offset+8])), 0)
m.fields["timestamp"] = t.Format(amqpTimeLayout)
offset += 8
}
if hasProperty(prop2, typeProp) {
msgType, next, err := getShortString(data, offset+1, uint32(data[offset]))
if err {
logp.Warn("Failed to get message type in header frame")
return true
}
m.fields["type"] = msgType
offset = next
}
if hasProperty(prop2, userIDProp) {
userID, next, err := getShortString(data, offset+1, uint32(data[offset]))
if err {
logp.Warn("Failed to get user id in header frame")
return true
}
m.fields["user-id"] = userID
offset = next
}
if hasProperty(prop2, appIDProp) {
appID, _, err := getShortString(data, offset+1, uint32(data[offset]))
if err {
logp.Warn("Failed to get app-id in header frame")
return true
}
m.fields["app-id"] = appID
}
return false
}