in runtime/core/protocol/grpc/consumer/consumer_processor.go [49:102]
func (p *processor) Subscribe(consumerMgr ConsumerManager, msg *pb.Subscription) (*pb.Response, error) {
hdr := msg.Header
if err := validator.ValidateHeader(hdr); err != nil {
log.Warnf("invalid header:%v", err)
return buildPBResponse(grpc.EVENTMESH_PROTOCOL_HEADER_ERR), err
}
if err := validator.ValidateSubscription(consts.WEBHOOK, msg); err != nil {
log.Warnf("invalid body:%v", err)
return buildPBResponse(grpc.EVENTMESH_PROTOCOL_BODY_ERR), err
}
consumerGroup := msg.ConsumerGroup
url := msg.Url
items := msg.SubscriptionItems
var newClients []*GroupClient
for _, item := range items {
newClients = append(newClients, &GroupClient{
ENV: hdr.Env,
IDC: hdr.Idc,
SYS: hdr.Sys,
IP: hdr.Ip,
PID: hdr.Pid,
ConsumerGroup: consumerGroup,
Topic: item.Topic,
SubscriptionMode: item.Mode,
GRPCType: consts.WEBHOOK,
URL: url,
LastUPTime: time.Now(),
})
}
for _, cli := range newClients {
if err := consumerMgr.RegisterClient(cli); err != nil {
return buildPBResponse(grpc.EVENTMESH_Subscribe_Register_ERR), err
}
}
meshConsumer, err := consumerMgr.GetConsumer(consumerGroup)
if err != nil {
return buildPBResponse(grpc.EVENTMESH_Consumer_NotFound_ERR), err
}
requireRestart := false
for _, cli := range newClients {
if meshConsumer.RegisterClient(cli) {
requireRestart = true
}
}
if requireRestart {
log.Infof("ConsumerGroup %v topic info changed, restart EventMesh Consumer", consumerGroup)
if err := consumerMgr.RestartConsumer(consumerGroup); err != nil {
return buildPBResponse(grpc.EVENTMESH_Consumer_NotFound_ERR), err
}
} else {
log.Warnf("EventMesh consumer [%v] didn't restart.", consumerGroup)
}
return buildPBResponse(grpc.SUCCESS), nil
}