in runtime/core/protocol/grpc/consumer/consumer_manager.go [128:150]
func (c *consumerManager) RestartConsumer(consumerGroup string) error {
val, ok := c.consumers.Load(consumerGroup)
if !ok {
return nil
}
emconsumer := val.(EventMeshConsumer)
if emconsumer.ServiceState() == consts.RUNNING {
if err := emconsumer.Shutdown(); err != nil {
return err
}
}
if err := emconsumer.Init(); err != nil {
return err
}
if err := emconsumer.Start(); err != nil {
return err
}
if emconsumer.ServiceState() != consts.RUNNING {
log.Warnf("restart eventmesh consumer failed, status:%v", emconsumer.ServiceState)
c.consumers.Delete(consumerGroup)
}
return nil
}