in runtime/core/protocol/grpc/consumer/consumer_service.go [93:113]
func (c *ConsumerService) SubscribeStream(stream pb.ConsumerService_SubscribeStreamServer) error {
for {
req, err := stream.Recv()
if err == io.EOF {
log.Infof("receive io.EOF, exit the recv goroutine")
break
}
if err != nil {
log.Warnf("receive err:%v", err)
break
}
if len(req.SubscriptionItems) == 0 {
log.Infof("receive reply msg, protocol:GRPC, client:%v", req.Header.Ip)
c.handleSubscribeReply(req, stream)
} else {
log.Infof("receive sub msg, protocol:GRPC, client:%v", req.Header.Ip)
c.handleSubscriptionStream(req, stream)
}
}
return nil
}