func()

in runtime/core/protocol/grpc/consumer/consumer_service.go [93:113]


func (c *ConsumerService) SubscribeStream(stream pb.ConsumerService_SubscribeStreamServer) error {
	for {
		req, err := stream.Recv()
		if err == io.EOF {
			log.Infof("receive io.EOF, exit the recv goroutine")
			break
		}
		if err != nil {
			log.Warnf("receive err:%v", err)
			break
		}
		if len(req.SubscriptionItems) == 0 {
			log.Infof("receive reply msg, protocol:GRPC, client:%v", req.Header.Ip)
			c.handleSubscribeReply(req, stream)
		} else {
			log.Infof("receive sub msg, protocol:GRPC, client:%v", req.Header.Ip)
			c.handleSubscriptionStream(req, stream)
		}
	}
	return nil
}