in internal/master/broadcast_task_master.go [356:407]
func (m *BroadcastTaskMaster) checkWorkerAlive() {
for !m.isInstanceStatusFinished() {
if !m.isMonitor() {
continue
}
for _, worker := range m.allWorkers {
m.aliveCheckWorkerSet.Add(worker)
}
for _, workerIdAddr := range m.aliveCheckWorkerSet.ToStringSlice() {
req := &schedulerx.MasterCheckWorkerAliveRequest{
JobInstanceId: proto.Int64(m.jobInstanceInfo.GetJobInstanceId()),
}
workerAddr := actorcomm.GetRealWorkerAddr(workerIdAddr)
_, err := m.actorContext.RequestFuture(actorcomm.GetHeartbeatActorPid(workerAddr), req, 10*time.Second).Result()
if err != nil {
m.existInvalidWorker = true
uniqueId, ok := m.worker2uniqueIdMap.Load(workerIdAddr)
if ok {
workerInfo := strings.Split(workerIdAddr, "@")
workerId := workerInfo[0]
workerAddr = actorcomm.GetRealWorkerAddr(workerIdAddr)
tokens := strings.Split(uniqueId.(string), utils.SplitterToken)
jobId, _ := strconv.Atoi(tokens[0])
jobInstanceId, _ := strconv.Atoi(tokens[1])
taskId, _ := strconv.Atoi(tokens[2])
req := &schedulerx.ContainerReportTaskStatusRequest{
JobId: proto.Int64(int64(jobId)),
JobInstanceId: proto.Int64(int64(jobInstanceId)),
TaskId: proto.Int64(int64(taskId)),
Status: proto.Int32(int32(taskstatus.TaskStatusFailed)),
WorkerId: proto.String(workerId),
WorkerAddr: proto.String(workerAddr),
SerialNum: proto.Int64(m.GetSerialNum()),
}
if err := m.UpdateTaskStatus(req); err != nil {
logger.Warnf("worker=%v is down, set=%v to failed status error, err=%s", workerAddr, uniqueId, err.Error())
} else {
logger.Warnf("worker=%v is down, set=%v to failed", workerAddr, uniqueId)
}
} else {
logger.Errorf("can't found workerAddr of uniqueId=%v", uniqueId)
}
}
}
time.Sleep(10 * time.Second)
}
}