in runtime/core/protocol/grpc/consumer/consumer_processor.go [159:211]
func (p *processor) SubscribeStream(consumerMgr ConsumerManager, emiter emitter.EventEmitter, msg *pb.Subscription) error {
hdr := msg.Header
if err := validator.ValidateHeader(hdr); err != nil {
log.Warnf("invalid header:%v", err)
emiter.SendStreamResp(hdr, grpc.EVENTMESH_PROTOCOL_HEADER_ERR)
return err
}
if err := validator.ValidateSubscription(consts.STREAM, msg); err != nil {
log.Warnf("invalid body:%v", err)
emiter.SendStreamResp(hdr, grpc.EVENTMESH_PROTOCOL_BODY_ERR)
return err
}
consumerGroup := msg.ConsumerGroup
var clients []*GroupClient
for _, item := range msg.SubscriptionItems {
clients = append(clients, &GroupClient{
ENV: hdr.Env,
IDC: hdr.Idc,
SYS: hdr.Sys,
IP: hdr.Ip,
PID: hdr.Pid,
ConsumerGroup: consumerGroup,
Topic: item.Topic,
SubscriptionMode: item.Mode,
GRPCType: consts.STREAM,
LastUPTime: time.Now(),
Emiter: emiter,
})
}
for _, cli := range clients {
if err := consumerMgr.RegisterClient(cli); err != nil {
return err
}
}
meshConsumer, err := consumerMgr.GetConsumer(consumerGroup)
if err != nil {
return err
}
requireRestart := false
for _, cli := range clients {
if meshConsumer.RegisterClient(cli) {
requireRestart = true
}
}
if requireRestart {
log.Infof("ConsumerGroup %v topic info changed, restart EventMesh Consumer", consumerGroup)
return consumerMgr.RestartConsumer(consumerGroup)
} else {
log.Warnf("EventMesh consumer [%v] didn't restart.", consumerGroup)
}
return nil
}