func()

in consumer/consumer.go [914:961]


func (dc *defaultConsumer) processPullResult(mq *primitive.MessageQueue, result *primitive.PullResult, data *internal.SubscriptionData) {

	dc.updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)

	switch result.Status {
	case primitive.PullFound:
		result.SetMessageExts(primitive.DecodeMessage(result.GetBody()))
		msgs := result.GetMessageExts()

		// filter message according to tags
		msgListFilterAgain := msgs
		if data.Tags.Len() > 0 && !data.ClassFilterMode {
			msgListFilterAgain = make([]*primitive.MessageExt, 0)
			for _, msg := range msgs {
				_, exist := data.Tags.Contains(msg.GetTags())
				if exist {
					msgListFilterAgain = append(msgListFilterAgain, msg)
				}
			}
		}

		if dc.option.filterMessageHooks != nil {
			for _, hook := range dc.option.filterMessageHooks {
				ctx := &hooks.FilterMessageContext{
					ConsumerGroup: dc.consumerGroup,
					Msg:           msgListFilterAgain,
					MQ:            mq,
					UnitMode:      dc.unitMode,
				}
				msgListFilterAgain, _ = hook(ctx)
			}
		}

		// TODO: add filter message hook
		for _, msg := range msgListFilterAgain {
			msg.Queue = mq
			traFlag, _ := strconv.ParseBool(msg.GetProperty(primitive.PropertyTransactionPrepared))
			if traFlag {
				msg.TransactionId = msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
			}

			msg.WithProperty(primitive.PropertyMinOffset, strconv.FormatInt(result.MinOffset, 10))
			msg.WithProperty(primitive.PropertyMaxOffset, strconv.FormatInt(result.MaxOffset, 10))
		}

		result.SetMessageExts(msgListFilterAgain)
	}
}