in internal/master/second_job_update_instance_status_handler.go [185:240]
func (h *secondJobUpdateInstanceStatusHandler) Handle(serialNum int64, instanceStatus processor.InstanceStatus, result string) error {
cycleId := utils.GetUniqueId(h.jobInstanceInfo.GetJobId(), h.jobInstanceInfo.GetJobInstanceId(), h.taskMaster.GetSerialNum())
logger.Infof("cycleId: %v instanceStatus=%v cycle update status.", cycleId, instanceStatus)
// if init failed, instance status finished and master has not been killed, so second job kill self
if !h.taskMaster.IsInited() && instanceStatus.IsFinished() && !h.taskMaster.IsKilled() {
h.taskMaster.KillInstance("killed, because of worker init failed.")
logger.Warnf("Init failed need to kill self, cycleId=%v", cycleId)
return nil
}
// if instance is killed, need to report to server
// From a logical point of view, you only need to judge whether the master has been killed.
// There is no need to judge whether the result contains the specified information.
// However, history says that we should not delete it in the short term.
if h.taskMaster.IsKilled() &&
(strings.Contains(result, "killed") || strings.Contains(result, "Worker master shutdown")) {
h.taskMaster.SetInstanceStatus(processor.InstanceStatusFailed)
h.taskMaster.Stop()
h.masterPool.Remove(h.jobInstanceInfo.GetJobInstanceId())
if result != "killed from server" {
// There is no status feedback for the server-side forced stop operation.
req := &schedulerx.WorkerReportJobInstanceStatusRequest{
JobId: proto.Int64(h.jobInstanceInfo.GetJobId()),
JobInstanceId: proto.Int64(h.jobInstanceInfo.GetJobInstanceId()),
Status: proto.Int32(h.jobInstanceInfo.GetStatus()),
GroupId: proto.String(h.jobInstanceInfo.GetGroupId()),
}
if result != "" {
req.Result = proto.String(result)
}
progress, err := h.getJobInstanceProgress()
if err != nil {
return fmt.Errorf("cycleId: %v instanceStatus=%v cycle update status failed due to getJobInstanceProgress failed, err=%s", cycleId, instanceStatus, err.Error())
}
if progress != "" {
req.Progress = proto.String(progress)
}
actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
Msg: req,
}
logger.Infof("report cycleId=%v, status=%v to AtLeastDeliveryRoutingActor", cycleId, instanceStatus)
}
// If the instance terminates no further action is required
return nil
}
// If job instance is finished, remove from TaskMasterPool
if instanceStatus.IsFinished() {
h.triggerNextCycle(cycleId, serialNum, instanceStatus)
}
return nil
}