func()

in internal/master/map_task_master.go [286:349]


func (m *MapTaskMaster) checkWorkerAlive() {
	for !m.GetInstanceStatus().IsFinished() {
		for _, worker := range m.GetJobInstanceInfo().GetAllWorkers() {
			m.aliveCheckWorkerSet.Add(worker)
		}
		if m.aliveCheckWorkerSet.Len() == 0 {
			logger.Warnf("worker list is empty, jobInstanceId=%d", m.GetJobInstanceInfo().GetJobInstanceId())
			m.taskPersistence.BatchUpdateTaskStatus(m.GetJobInstanceInfo().GetJobInstanceId(), taskstatus.TaskStatusFailed, "", "")
			break
		} else {
			for _, workerIdAddr := range m.aliveCheckWorkerSet.ToStringSlice() {
				workerAddr := actorcomm.GetRealWorkerAddr(workerIdAddr)

				times := 0
				for times < 3 {
					conn, err := net.Dial("tcp", workerAddr)
					if err == nil {
						logger.Debugf("socket to %s is reachable, times=%d", workerAddr, times)
						conn.Close()
						break
					} else {
						logger.Warnf("socket to %s is not reachable, times=%d", workerAddr, times)
						time.Sleep(5 * time.Second)
						times++
					}
				}
				if times >= 3 {
					logger.Warnf("worker[%s] is down, start to remove this worker and failover tasks, jobInstanceId=%d", workerIdAddr, m.GetJobInstanceInfo().GetJobInstanceId())
					m.handleWorkerShutdown(workerIdAddr)
					continue
				}

				request := &schedulerx.MasterCheckWorkerAliveRequest{
					JobInstanceId: proto.Int64(m.GetJobInstanceInfo().GetJobInstanceId()),
					DispatchMode:  proto.String(m.xAttrs.GetTaskDispatchMode()),
				}
				response, err := m.actorContext.RequestFuture(actorcomm.GetHeartbeatActorPid(workerAddr), request, 10*time.Second).Result()
				if err != nil {
					logger.Errorf("check worker error, jobInstanceId=%d, err=%s", m.GetJobInstanceInfo().GetJobInstanceId(), err.Error())
					break
				}

				if resp := response.(*schedulerx.MasterCheckWorkerAliveResponse); !resp.GetSuccess() {
					logger.Warnf("jobInstanceId=%d of worker=%s is not alive, remote worker resp=%+v", m.GetJobInstanceInfo().GetJobInstanceId(), workerIdAddr, resp.GetMessage())
					m.handleWorkerShutdown(workerIdAddr)

					// destroy containers of worker of PullModel
					destroyContainerPoolRequest := &schedulerx.MasterDestroyContainerPoolRequest{
						JobInstanceId: proto.Int64(m.GetJobInstanceInfo().GetJobInstanceId()),
						JobId:         proto.Int64(m.GetJobInstanceInfo().GetJobId()),
						WorkerIdAddr:  proto.String(workerAddr),
						SerialNum:     proto.Int64(m.GetSerialNum()),
					}
					actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
						Msg: destroyContainerPoolRequest,
					}
				}
			}

			// Worker detection is performed every 10 seconds
			time.Sleep(10 * time.Second)
		}
	}
}