func()

in internal/master/broadcast_task_master.go [356:407]


func (m *BroadcastTaskMaster) checkWorkerAlive() {
	for !m.isInstanceStatusFinished() {
		if !m.isMonitor() {
			continue
		}

		for _, worker := range m.allWorkers {
			m.aliveCheckWorkerSet.Add(worker)
		}

		for _, workerIdAddr := range m.aliveCheckWorkerSet.ToStringSlice() {
			req := &schedulerx.MasterCheckWorkerAliveRequest{
				JobInstanceId: proto.Int64(m.jobInstanceInfo.GetJobInstanceId()),
			}

			workerAddr := actorcomm.GetRealWorkerAddr(workerIdAddr)
			_, err := m.actorContext.RequestFuture(actorcomm.GetHeartbeatActorPid(workerAddr), req, 10*time.Second).Result()
			if err != nil {
				m.existInvalidWorker = true
				uniqueId, ok := m.worker2uniqueIdMap.Load(workerIdAddr)
				if ok {
					workerInfo := strings.Split(workerIdAddr, "@")
					workerId := workerInfo[0]
					workerAddr = actorcomm.GetRealWorkerAddr(workerIdAddr)
					tokens := strings.Split(uniqueId.(string), utils.SplitterToken)
					jobId, _ := strconv.Atoi(tokens[0])
					jobInstanceId, _ := strconv.Atoi(tokens[1])
					taskId, _ := strconv.Atoi(tokens[2])

					req := &schedulerx.ContainerReportTaskStatusRequest{
						JobId:         proto.Int64(int64(jobId)),
						JobInstanceId: proto.Int64(int64(jobInstanceId)),
						TaskId:        proto.Int64(int64(taskId)),
						Status:        proto.Int32(int32(taskstatus.TaskStatusFailed)),
						WorkerId:      proto.String(workerId),
						WorkerAddr:    proto.String(workerAddr),
						SerialNum:     proto.Int64(m.GetSerialNum()),
					}
					if err := m.UpdateTaskStatus(req); err != nil {
						logger.Warnf("worker=%v is down, set=%v to failed status error, err=%s", workerAddr, uniqueId, err.Error())
					} else {
						logger.Warnf("worker=%v is down, set=%v to failed", workerAddr, uniqueId)
					}
				} else {
					logger.Errorf("can't found workerAddr of uniqueId=%v", uniqueId)
				}
			}
		}

		time.Sleep(10 * time.Second)
	}
}