func()

in runtime/core/protocol/grpc/consumer/consumer_processor.go [159:211]


func (p *processor) SubscribeStream(consumerMgr ConsumerManager, emiter emitter.EventEmitter, msg *pb.Subscription) error {
	hdr := msg.Header
	if err := validator.ValidateHeader(hdr); err != nil {
		log.Warnf("invalid header:%v", err)
		emiter.SendStreamResp(hdr, grpc.EVENTMESH_PROTOCOL_HEADER_ERR)
		return err
	}
	if err := validator.ValidateSubscription(consts.STREAM, msg); err != nil {
		log.Warnf("invalid body:%v", err)
		emiter.SendStreamResp(hdr, grpc.EVENTMESH_PROTOCOL_BODY_ERR)
		return err
	}
	consumerGroup := msg.ConsumerGroup
	var clients []*GroupClient
	for _, item := range msg.SubscriptionItems {
		clients = append(clients, &GroupClient{
			ENV:              hdr.Env,
			IDC:              hdr.Idc,
			SYS:              hdr.Sys,
			IP:               hdr.Ip,
			PID:              hdr.Pid,
			ConsumerGroup:    consumerGroup,
			Topic:            item.Topic,
			SubscriptionMode: item.Mode,
			GRPCType:         consts.STREAM,
			LastUPTime:       time.Now(),
			Emiter:           emiter,
		})
	}
	for _, cli := range clients {
		if err := consumerMgr.RegisterClient(cli); err != nil {
			return err
		}
	}
	meshConsumer, err := consumerMgr.GetConsumer(consumerGroup)
	if err != nil {
		return err
	}
	requireRestart := false
	for _, cli := range clients {
		if meshConsumer.RegisterClient(cli) {
			requireRestart = true
		}
	}
	if requireRestart {
		log.Infof("ConsumerGroup %v topic info changed, restart EventMesh Consumer", consumerGroup)
		return consumerMgr.RestartConsumer(consumerGroup)
	} else {
		log.Warnf("EventMesh consumer [%v] didn't restart.", consumerGroup)
	}

	return nil
}