in consumer/consumer.go [914:961]
func (dc *defaultConsumer) processPullResult(mq *primitive.MessageQueue, result *primitive.PullResult, data *internal.SubscriptionData) {
dc.updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
switch result.Status {
case primitive.PullFound:
result.SetMessageExts(primitive.DecodeMessage(result.GetBody()))
msgs := result.GetMessageExts()
// filter message according to tags
msgListFilterAgain := msgs
if data.Tags.Len() > 0 && !data.ClassFilterMode {
msgListFilterAgain = make([]*primitive.MessageExt, 0)
for _, msg := range msgs {
_, exist := data.Tags.Contains(msg.GetTags())
if exist {
msgListFilterAgain = append(msgListFilterAgain, msg)
}
}
}
if dc.option.filterMessageHooks != nil {
for _, hook := range dc.option.filterMessageHooks {
ctx := &hooks.FilterMessageContext{
ConsumerGroup: dc.consumerGroup,
Msg: msgListFilterAgain,
MQ: mq,
UnitMode: dc.unitMode,
}
msgListFilterAgain, _ = hook(ctx)
}
}
// TODO: add filter message hook
for _, msg := range msgListFilterAgain {
msg.Queue = mq
traFlag, _ := strconv.ParseBool(msg.GetProperty(primitive.PropertyTransactionPrepared))
if traFlag {
msg.TransactionId = msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
}
msg.WithProperty(primitive.PropertyMinOffset, strconv.FormatInt(result.MinOffset, 10))
msg.WithProperty(primitive.PropertyMaxOffset, strconv.FormatInt(result.MaxOffset, 10))
}
result.SetMessageExts(msgListFilterAgain)
}
}