in runtime/core/protocol/grpc/consumer/consumer_processor.go [213:242]
func (p *processor) Heartbeat(consumerMgr ConsumerManager, msg *pb.Heartbeat) (*pb.Response, error) {
hdr := msg.Header
if err := validator.ValidateHeader(hdr); err != nil {
log.Warnf("invalid header:%v", err)
return buildPBResponse(grpc.EVENTMESH_PROTOCOL_HEADER_ERR), err
}
if err := validator.ValidateHeartBeat(msg); err != nil {
log.Warnf("invalid body:%v", err)
return buildPBResponse(grpc.EVENTMESH_PROTOCOL_BODY_ERR), err
}
if msg.ClientType != pb.Heartbeat_SUB {
log.Warnf("client type err, not sub")
return buildPBResponse(grpc.EVENTMESH_Heartbeat_Protocol_ERR), fmt.Errorf("protocol not sub")
}
consumerGroup := msg.ConsumerGroup
for _, item := range msg.HeartbeatItems {
cli := &GroupClient{
ENV: hdr.Env,
IDC: hdr.Idc,
SYS: hdr.Sys,
IP: hdr.Ip,
PID: hdr.Pid,
ConsumerGroup: consumerGroup,
Topic: item.Topic,
LastUPTime: time.Now(),
}
consumerMgr.UpdateClientTime(cli)
}
return buildPBResponse(grpc.SUCCESS), nil
}