in internal/remoting/heartbeat.go [54:93]
func KeepHeartbeat(ctx context.Context, actorSystem *actor.ActorSystem, appKey string, stopChan chan os.Signal) {
var (
taskMasterPool = masterpool.GetTaskMasterPool()
groupManager = discovery.GetGroupManager()
)
heartbeat := func(online bool) {
_, actorSystemPort, err := actorSystem.GetHostPort()
if err != nil {
logger.Errorf("Write heartbeat to remote failed due to get actorSystem port failed, err=%s", err.Error())
return
}
for groupId, appGroupId := range groupManager.GroupId2AppGroupIdMap() {
jobInstanceIds := taskMasterPool.GetInstanceIds(appGroupId)
heartbeatReq := genHeartBeatRequest(groupId, appGroupId, jobInstanceIds, actorSystemPort, online, appKey)
if err := sendHeartbeat(ctx, heartbeatReq); err != nil {
if errors.Is(err, syscall.EPIPE) || errors.Is(err, os.ErrDeadlineExceeded) {
pool.GetConnPool().ReconnectTrigger() <- struct{}{}
}
logger.Warnf("Write heartbeat to server failed, had already re-connect with server, reason=%s", err.Error())
continue
}
logger.Debugf("Write heartbeat to remote succeed.")
}
}
heartbeat(true)
ticker := time.NewTicker(heartbeatInterval)
defer ticker.Stop()
for {
select {
case <-stopChan:
heartbeat(false)
logger.Infof("Write shutdown heartbeat to remote succeed.")
return
case <-ticker.C:
heartbeat(true)
}
}
}