func getMessageProperties()

in packetbeat/protos/amqp/amqp_parser.go [192:328]


func getMessageProperties(s *amqpStream, data []byte) bool {
	m := s.message

	// properties are coded in the two first bytes
	prop1 := data[0]
	prop2 := data[1]
	var offset uint32 = 2

	// while last bit set, we have another property flag field
	for lastbit := 1; data[lastbit]&1 == 1; {
		lastbit += 2
		offset += 2
	}

	if hasProperty(prop1, contentTypeProp) {
		contentType, next, err := getShortString(data, offset+1, uint32(data[offset]))
		if err {
			logp.Warn("Failed to get content type in header frame")
			return true
		}
		m.fields["content-type"] = contentType
		offset = next
	}

	if hasProperty(prop1, contentEncodingProp) {
		contentEncoding, next, err := getShortString(data, offset+1, uint32(data[offset]))
		if err {
			logp.Warn("Failed to get content encoding in header frame")
			return true
		}
		m.fields["content-encoding"] = contentEncoding
		offset = next
	}

	if hasProperty(prop1, headersProp) {
		headers := mapstr.M{}
		next, err, exists := getTable(headers, data, offset)
		if !err && exists {
			m.fields["headers"] = headers
		} else if err {
			logp.Warn("Failed to get headers")
			return true
		}
		offset = next
	}

	if hasProperty(prop1, deliveryModeProp) {
		switch data[offset] {
		case 1:
			m.fields["delivery-mode"] = "non-persistent"
		case 2:
			m.fields["delivery-mode"] = "persistent"
		}
		offset++
	}

	if hasProperty(prop1, priorityProp) {
		m.fields["priority"] = data[offset]
		offset++
	}

	if hasProperty(prop1, correlationIDProp) {
		correlationID, next, err := getShortString(data, offset+1, uint32(data[offset]))
		if err {
			logp.Warn("Failed to get correlation-id in header frame")
			return true
		}
		m.fields["correlation-id"] = correlationID
		offset = next
	}

	if hasProperty(prop1, replyToProp) {
		replyTo, next, err := getShortString(data, offset+1, uint32(data[offset]))
		if err {
			logp.Warn("Failed to get reply-to in header frame")
			return true
		}
		m.fields["reply-to"] = replyTo
		offset = next
	}

	if hasProperty(prop1, expirationProp) {
		expiration, next, err := getShortString(data, offset+1, uint32(data[offset]))
		if err {
			logp.Warn("Failed to get expiration in header frame")
			return true
		}
		m.fields["expiration"] = expiration
		offset = next
	}

	if hasProperty(prop2, messageIDProp) {
		messageID, next, err := getShortString(data, offset+1, uint32(data[offset]))
		if err {
			logp.Warn("Failed to get message id in header frame")
			return true
		}
		m.fields["message-id"] = messageID
		offset = next
	}

	if hasProperty(prop2, timestampProp) {
		t := time.Unix(int64(binary.BigEndian.Uint64(data[offset:offset+8])), 0)
		m.fields["timestamp"] = t.Format(amqpTimeLayout)
		offset += 8
	}

	if hasProperty(prop2, typeProp) {
		msgType, next, err := getShortString(data, offset+1, uint32(data[offset]))
		if err {
			logp.Warn("Failed to get message type in header frame")
			return true
		}
		m.fields["type"] = msgType
		offset = next
	}

	if hasProperty(prop2, userIDProp) {
		userID, next, err := getShortString(data, offset+1, uint32(data[offset]))
		if err {
			logp.Warn("Failed to get user id in header frame")
			return true
		}
		m.fields["user-id"] = userID
		offset = next
	}

	if hasProperty(prop2, appIDProp) {
		appID, _, err := getShortString(data, offset+1, uint32(data[offset]))
		if err {
			logp.Warn("Failed to get app-id in header frame")
			return true
		}
		m.fields["app-id"] = appID
	}
	return false
}