func()

in consumer/push_consumer.go [380:431]


func (pc *pushConsumer) ConsumeMessageDirectly(msg *primitive.MessageExt, brokerName string) *internal.ConsumeMessageDirectlyResult {
	var msgs = []*primitive.MessageExt{msg}
	var mq = &primitive.MessageQueue{
		Topic:      msg.Topic,
		BrokerName: brokerName,
		QueueId:    msg.Queue.QueueId,
	}

	beginTime := time.Now()
	pc.resetRetryAndNamespace(msgs)
	var result ConsumeResult

	var err error
	msgCtx := &primitive.ConsumeMessageContext{
		Properties:    make(map[string]string),
		ConsumerGroup: pc.consumerGroup,
		MQ:            mq,
		Msgs:          msgs,
	}
	ctx := context.Background()
	ctx = primitive.WithConsumerCtx(ctx, msgCtx)
	ctx = primitive.WithMethod(ctx, primitive.ConsumerPush)
	concurrentCtx := primitive.NewConsumeConcurrentlyContext()
	concurrentCtx.MQ = *mq
	ctx = primitive.WithConcurrentlyCtx(ctx, concurrentCtx)

	result, err = pc.consumeInner(ctx, msgs)

	consumeRT := time.Now().Sub(beginTime)

	res := &internal.ConsumeMessageDirectlyResult{
		Order:          false,
		AutoCommit:     true,
		SpentTimeMills: int64(consumeRT / time.Millisecond),
	}

	if err != nil {
		msgCtx.Properties[primitive.PropCtxType] = string(primitive.ExceptionReturn)
		res.ConsumeResult = internal.ThrowException
		res.Remark = err.Error()
	} else if result == ConsumeSuccess {
		msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn)
		res.ConsumeResult = internal.ConsumeSuccess
	} else if result == ConsumeRetryLater {
		msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn)
		res.ConsumeResult = internal.ConsumeRetryLater
	}

	pc.stat.increaseConsumeRT(pc.consumerGroup, mq.Topic, int64(consumeRT/time.Millisecond))

	return res
}