in consumer/push_consumer.go [380:431]
func (pc *pushConsumer) ConsumeMessageDirectly(msg *primitive.MessageExt, brokerName string) *internal.ConsumeMessageDirectlyResult {
var msgs = []*primitive.MessageExt{msg}
var mq = &primitive.MessageQueue{
Topic: msg.Topic,
BrokerName: brokerName,
QueueId: msg.Queue.QueueId,
}
beginTime := time.Now()
pc.resetRetryAndNamespace(msgs)
var result ConsumeResult
var err error
msgCtx := &primitive.ConsumeMessageContext{
Properties: make(map[string]string),
ConsumerGroup: pc.consumerGroup,
MQ: mq,
Msgs: msgs,
}
ctx := context.Background()
ctx = primitive.WithConsumerCtx(ctx, msgCtx)
ctx = primitive.WithMethod(ctx, primitive.ConsumerPush)
concurrentCtx := primitive.NewConsumeConcurrentlyContext()
concurrentCtx.MQ = *mq
ctx = primitive.WithConcurrentlyCtx(ctx, concurrentCtx)
result, err = pc.consumeInner(ctx, msgs)
consumeRT := time.Now().Sub(beginTime)
res := &internal.ConsumeMessageDirectlyResult{
Order: false,
AutoCommit: true,
SpentTimeMills: int64(consumeRT / time.Millisecond),
}
if err != nil {
msgCtx.Properties[primitive.PropCtxType] = string(primitive.ExceptionReturn)
res.ConsumeResult = internal.ThrowException
res.Remark = err.Error()
} else if result == ConsumeSuccess {
msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn)
res.ConsumeResult = internal.ConsumeSuccess
} else if result == ConsumeRetryLater {
msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn)
res.ConsumeResult = internal.ConsumeRetryLater
}
pc.stat.increaseConsumeRT(pc.consumerGroup, mq.Topic, int64(consumeRT/time.Millisecond))
return res
}