func()

in runtime/core/protocol/grpc/consumer/consumer_processor.go [213:242]


func (p *processor) Heartbeat(consumerMgr ConsumerManager, msg *pb.Heartbeat) (*pb.Response, error) {
	hdr := msg.Header
	if err := validator.ValidateHeader(hdr); err != nil {
		log.Warnf("invalid header:%v", err)
		return buildPBResponse(grpc.EVENTMESH_PROTOCOL_HEADER_ERR), err
	}
	if err := validator.ValidateHeartBeat(msg); err != nil {
		log.Warnf("invalid body:%v", err)
		return buildPBResponse(grpc.EVENTMESH_PROTOCOL_BODY_ERR), err
	}
	if msg.ClientType != pb.Heartbeat_SUB {
		log.Warnf("client type err, not sub")
		return buildPBResponse(grpc.EVENTMESH_Heartbeat_Protocol_ERR), fmt.Errorf("protocol not sub")
	}
	consumerGroup := msg.ConsumerGroup
	for _, item := range msg.HeartbeatItems {
		cli := &GroupClient{
			ENV:           hdr.Env,
			IDC:           hdr.Idc,
			SYS:           hdr.Sys,
			IP:            hdr.Ip,
			PID:           hdr.Pid,
			ConsumerGroup: consumerGroup,
			Topic:         item.Topic,
			LastUPTime:    time.Now(),
		}
		consumerMgr.UpdateClientTime(cli)
	}
	return buildPBResponse(grpc.SUCCESS), nil
}