in golang/simple_consumer.go [205:276]
func (sc *defaultSimpleConsumer) receiveMessage(ctx context.Context, request *v2.ReceiveMessageRequest, messageQueue *v2.MessageQueue, timeout time.Duration) ([]*MessageView, error) {
var err error
ctx = sc.cli.Sign(ctx)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
endpoints := messageQueue.GetBroker().GetEndpoints()
receiveMessageClient, err := sc.cli.clientManager.ReceiveMessage(ctx, endpoints, request)
if err != nil {
return nil, err
}
done := make(chan bool, 1)
resps := make([]*v2.ReceiveMessageResponse, 0)
go func() {
for {
var resp *v2.ReceiveMessageResponse
resp, err = receiveMessageClient.Recv()
if err == io.EOF {
done <- true
defer close(done)
break
}
if err != nil {
sc.cli.log.Errorf("simpleConsumer recv msg err=%v, requestId=%s", err, utils.GetRequestID(ctx))
break
}
sugarBaseLogger.Debugf("receiveMessage response: %v", resp)
resps = append(resps, resp)
}
cancel()
}()
select {
case <-ctx.Done():
// timeout
return nil, fmt.Errorf("[error] CODE=DEADLINE_EXCEEDED")
case <-done:
if err != nil && err != io.EOF {
return nil, err
}
messageViewList := make([]*MessageView, 0)
status := &v2.Status{
Code: v2.Code_INTERNAL_SERVER_ERROR,
Message: "status was not set by server",
}
var deliveryTimestamp *timestamppb.Timestamp
messageList := make([]*v2.Message, 0)
for _, resp := range resps {
switch r := resp.GetContent().(type) {
case *v2.ReceiveMessageResponse_Status:
status = r.Status
case *v2.ReceiveMessageResponse_Message:
messageList = append(messageList, r.Message)
case *v2.ReceiveMessageResponse_DeliveryTimestamp:
deliveryTimestamp = r.DeliveryTimestamp
default:
sc.cli.log.Warnf("[bug] not recognized content for receive message response, mq=%v, resp=%v", messageQueue, resp)
}
}
for _, message := range messageList {
messageView := fromProtobuf_MessageView2(message, messageQueue, deliveryTimestamp)
messageViewList = append(messageViewList, messageView)
}
if status.GetCode() == v2.Code_OK {
return messageViewList, nil
} else {
return nil, &ErrRpcStatus{
Code: int32(status.GetCode()),
Message: status.GetMessage(),
}
}
}
}