in internal/master/map_task_master.go [286:349]
func (m *MapTaskMaster) checkWorkerAlive() {
for !m.GetInstanceStatus().IsFinished() {
for _, worker := range m.GetJobInstanceInfo().GetAllWorkers() {
m.aliveCheckWorkerSet.Add(worker)
}
if m.aliveCheckWorkerSet.Len() == 0 {
logger.Warnf("worker list is empty, jobInstanceId=%d", m.GetJobInstanceInfo().GetJobInstanceId())
m.taskPersistence.BatchUpdateTaskStatus(m.GetJobInstanceInfo().GetJobInstanceId(), taskstatus.TaskStatusFailed, "", "")
break
} else {
for _, workerIdAddr := range m.aliveCheckWorkerSet.ToStringSlice() {
workerAddr := actorcomm.GetRealWorkerAddr(workerIdAddr)
times := 0
for times < 3 {
conn, err := net.Dial("tcp", workerAddr)
if err == nil {
logger.Debugf("socket to %s is reachable, times=%d", workerAddr, times)
conn.Close()
break
} else {
logger.Warnf("socket to %s is not reachable, times=%d", workerAddr, times)
time.Sleep(5 * time.Second)
times++
}
}
if times >= 3 {
logger.Warnf("worker[%s] is down, start to remove this worker and failover tasks, jobInstanceId=%d", workerIdAddr, m.GetJobInstanceInfo().GetJobInstanceId())
m.handleWorkerShutdown(workerIdAddr)
continue
}
request := &schedulerx.MasterCheckWorkerAliveRequest{
JobInstanceId: proto.Int64(m.GetJobInstanceInfo().GetJobInstanceId()),
DispatchMode: proto.String(m.xAttrs.GetTaskDispatchMode()),
}
response, err := m.actorContext.RequestFuture(actorcomm.GetHeartbeatActorPid(workerAddr), request, 10*time.Second).Result()
if err != nil {
logger.Errorf("check worker error, jobInstanceId=%d, err=%s", m.GetJobInstanceInfo().GetJobInstanceId(), err.Error())
break
}
if resp := response.(*schedulerx.MasterCheckWorkerAliveResponse); !resp.GetSuccess() {
logger.Warnf("jobInstanceId=%d of worker=%s is not alive, remote worker resp=%+v", m.GetJobInstanceInfo().GetJobInstanceId(), workerIdAddr, resp.GetMessage())
m.handleWorkerShutdown(workerIdAddr)
// destroy containers of worker of PullModel
destroyContainerPoolRequest := &schedulerx.MasterDestroyContainerPoolRequest{
JobInstanceId: proto.Int64(m.GetJobInstanceInfo().GetJobInstanceId()),
JobId: proto.Int64(m.GetJobInstanceInfo().GetJobId()),
WorkerIdAddr: proto.String(workerAddr),
SerialNum: proto.Int64(m.GetSerialNum()),
}
actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
Msg: destroyContainerPoolRequest,
}
}
}
// Worker detection is performed every 10 seconds
time.Sleep(10 * time.Second)
}
}
}