func()

in internal/master/second_job_update_instance_status_handler.go [185:240]


func (h *secondJobUpdateInstanceStatusHandler) Handle(serialNum int64, instanceStatus processor.InstanceStatus, result string) error {
	cycleId := utils.GetUniqueId(h.jobInstanceInfo.GetJobId(), h.jobInstanceInfo.GetJobInstanceId(), h.taskMaster.GetSerialNum())
	logger.Infof("cycleId: %v instanceStatus=%v cycle update status.", cycleId, instanceStatus)

	// if init failed, instance status finished and master has not been killed, so second job kill self
	if !h.taskMaster.IsInited() && instanceStatus.IsFinished() && !h.taskMaster.IsKilled() {
		h.taskMaster.KillInstance("killed, because of worker init failed.")
		logger.Warnf("Init failed need to kill self, cycleId=%v", cycleId)
		return nil
	}

	// if instance is killed, need to report to server
	// From a logical point of view, you only need to judge whether the master has been killed.
	// There is no need to judge whether the result contains the specified information.
	// However, history says that we should not delete it in the short term.
	if h.taskMaster.IsKilled() &&
		(strings.Contains(result, "killed") || strings.Contains(result, "Worker master shutdown")) {
		h.taskMaster.SetInstanceStatus(processor.InstanceStatusFailed)
		h.taskMaster.Stop()
		h.masterPool.Remove(h.jobInstanceInfo.GetJobInstanceId())

		if result != "killed from server" {
			// There is no status feedback for the server-side forced stop operation.
			req := &schedulerx.WorkerReportJobInstanceStatusRequest{
				JobId:         proto.Int64(h.jobInstanceInfo.GetJobId()),
				JobInstanceId: proto.Int64(h.jobInstanceInfo.GetJobInstanceId()),
				Status:        proto.Int32(h.jobInstanceInfo.GetStatus()),
				GroupId:       proto.String(h.jobInstanceInfo.GetGroupId()),
			}
			if result != "" {
				req.Result = proto.String(result)
			}
			progress, err := h.getJobInstanceProgress()
			if err != nil {
				return fmt.Errorf("cycleId: %v instanceStatus=%v cycle update status failed due to getJobInstanceProgress failed, err=%s", cycleId, instanceStatus, err.Error())
			}
			if progress != "" {
				req.Progress = proto.String(progress)
			}
			actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
				Msg: req,
			}

			logger.Infof("report cycleId=%v, status=%v to AtLeastDeliveryRoutingActor", cycleId, instanceStatus)
		}

		// If the instance terminates no further action is required
		return nil
	}

	// If job instance is finished, remove from TaskMasterPool
	if instanceStatus.IsFinished() {
		h.triggerNextCycle(cycleId, serialNum, instanceStatus)
	}
	return nil
}