func KeepHeartbeat()

in internal/remoting/heartbeat.go [54:93]


func KeepHeartbeat(ctx context.Context, actorSystem *actor.ActorSystem, appKey string, stopChan chan os.Signal) {
	var (
		taskMasterPool = masterpool.GetTaskMasterPool()
		groupManager   = discovery.GetGroupManager()
	)

	heartbeat := func(online bool) {
		_, actorSystemPort, err := actorSystem.GetHostPort()
		if err != nil {
			logger.Errorf("Write heartbeat to remote failed due to get actorSystem port failed, err=%s", err.Error())
			return
		}
		for groupId, appGroupId := range groupManager.GroupId2AppGroupIdMap() {
			jobInstanceIds := taskMasterPool.GetInstanceIds(appGroupId)
			heartbeatReq := genHeartBeatRequest(groupId, appGroupId, jobInstanceIds, actorSystemPort, online, appKey)
			if err := sendHeartbeat(ctx, heartbeatReq); err != nil {
				if errors.Is(err, syscall.EPIPE) || errors.Is(err, os.ErrDeadlineExceeded) {
					pool.GetConnPool().ReconnectTrigger() <- struct{}{}
				}
				logger.Warnf("Write heartbeat to server failed, had already re-connect with server, reason=%s", err.Error())
				continue
			}
			logger.Debugf("Write heartbeat to remote succeed.")
		}
	}
	heartbeat(true)

	ticker := time.NewTicker(heartbeatInterval)
	defer ticker.Stop()
	for {
		select {
		case <-stopChan:
			heartbeat(false)
			logger.Infof("Write shutdown heartbeat to remote succeed.")
			return
		case <-ticker.C:
			heartbeat(true)
		}
	}
}