func()

in golang/simple_consumer.go [205:276]


func (sc *defaultSimpleConsumer) receiveMessage(ctx context.Context, request *v2.ReceiveMessageRequest, messageQueue *v2.MessageQueue, timeout time.Duration) ([]*MessageView, error) {
	var err error
	ctx = sc.cli.Sign(ctx)
	ctx, cancel := context.WithTimeout(ctx, timeout)
	defer cancel()
	endpoints := messageQueue.GetBroker().GetEndpoints()
	receiveMessageClient, err := sc.cli.clientManager.ReceiveMessage(ctx, endpoints, request)
	if err != nil {
		return nil, err
	}
	done := make(chan bool, 1)

	resps := make([]*v2.ReceiveMessageResponse, 0)
	go func() {
		for {
			var resp *v2.ReceiveMessageResponse
			resp, err = receiveMessageClient.Recv()
			if err == io.EOF {
				done <- true
				defer close(done)
				break
			}
			if err != nil {
				sc.cli.log.Errorf("simpleConsumer recv msg err=%v, requestId=%s", err, utils.GetRequestID(ctx))
				break
			}
			sugarBaseLogger.Debugf("receiveMessage response: %v", resp)
			resps = append(resps, resp)
		}
		cancel()
	}()
	select {
	case <-ctx.Done():
		// timeout
		return nil, fmt.Errorf("[error] CODE=DEADLINE_EXCEEDED")
	case <-done:
		if err != nil && err != io.EOF {
			return nil, err
		}
		messageViewList := make([]*MessageView, 0)
		status := &v2.Status{
			Code:    v2.Code_INTERNAL_SERVER_ERROR,
			Message: "status was not set by server",
		}
		var deliveryTimestamp *timestamppb.Timestamp
		messageList := make([]*v2.Message, 0)
		for _, resp := range resps {
			switch r := resp.GetContent().(type) {
			case *v2.ReceiveMessageResponse_Status:
				status = r.Status
			case *v2.ReceiveMessageResponse_Message:
				messageList = append(messageList, r.Message)
			case *v2.ReceiveMessageResponse_DeliveryTimestamp:
				deliveryTimestamp = r.DeliveryTimestamp
			default:
				sc.cli.log.Warnf("[bug] not recognized content for receive message response, mq=%v, resp=%v", messageQueue, resp)
			}
		}
		for _, message := range messageList {
			messageView := fromProtobuf_MessageView2(message, messageQueue, deliveryTimestamp)
			messageViewList = append(messageViewList, messageView)
		}
		if status.GetCode() == v2.Code_OK {
			return messageViewList, nil
		} else {
			return nil, &ErrRpcStatus{
				Code:    int32(status.GetCode()),
				Message: status.GetMessage(),
			}
		}
	}
}