func()

in internal/master/common_update_instance_status_handler.go [47:124]


func (rcvr *commonUpdateInstanceStatusHandler) Handle(serialNum int64, instanceStatus processor.InstanceStatus, result string) error {
	jobInstanceId := rcvr.jobInstanceInfo.GetJobInstanceId()
	uniqueId := utils.GetUniqueIdWithoutTaskId(rcvr.jobInstanceInfo.GetJobId(), jobInstanceId)

	if rcvr.taskMaster.GetInstanceStatus() != instanceStatus {
		rcvr.taskMaster.SetInstanceStatus(instanceStatus)
		if instanceStatus.IsFinished() {
			postResult := rcvr.taskMaster.PostFinish(jobInstanceId)
			if postResult != nil {
				if instanceStatus == processor.InstanceStatusSucceed && postResult.Status() == processor.InstanceStatusFailed {
					instanceStatus = processor.InstanceStatusFailed
				}
				if postResult.Result() != "" {
					result = postResult.Result()
				}
			}

			// report job instance status with at-least-once-delivery
			req := &schedulerx.WorkerReportJobInstanceStatusRequest{
				JobId:         proto.Int64(rcvr.jobInstanceInfo.GetJobId()),
				JobInstanceId: proto.Int64(jobInstanceId),
				Status:        proto.Int32(int32(instanceStatus)),
				DeliveryId:    proto.Int64(utils.GetDeliveryId()),
				GroupId:       proto.String(rcvr.jobInstanceInfo.GetGroupId()),
			}
			if result != "" {
				req.Result = proto.String(result)
			}
			progress, err := rcvr.taskMaster.GetJobInstanceProgress()
			if err == nil {
				req.Progress = proto.String(progress)
			} else {
				logger.Warnf("report job instance status with at-least-once-delivery failed, due to GetJobInstanceProgress is empty")
			}

			actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
				Msg: req,
			}
			logger.Infof("report jobInstance=%d, status=%d to AtLeastDeliveryRoutingActor", jobInstanceId, instanceStatus)

			// destroy containers and taskMaster
			if !config.GetWorkerConfig().IsShareContainerPool() {
				rcvr.taskMaster.DestroyContainerPool()
			}
			if taskMaster := rcvr.masterPool.Get(jobInstanceId); taskMaster != nil {
				taskMaster.Stop()
				rcvr.masterPool.Remove(jobInstanceId)
			}

			logger.Infof("uniqueId: %s is finished, remove from MasterPool.", uniqueId)
		}
	}
	progress, err := rcvr.taskMaster.GetJobInstanceProgress()
	if err != nil {
		logger.Warnf("report job instance status with at-least-once-delivery failed, due to GetJobInstanceProgress is empty")
	}

	_, ok := rcvr.taskMaster.(*StandaloneTaskMaster)
	if ok && !instanceStatus.IsFinished() && progress != "" {
		// report job instance status with at-least-once-delivery
		reportStatusReq := &schedulerx.WorkerReportJobInstanceStatusRequest{
			JobId:         proto.Int64(rcvr.jobInstanceInfo.GetJobId()),
			JobInstanceId: proto.Int64(jobInstanceId),
			Status:        proto.Int32(int32(instanceStatus)),
			Progress:      proto.String(progress),
			DeliveryId:    proto.Int64(utils.GetDeliveryId()),
			GroupId:       proto.String(rcvr.jobInstanceInfo.GetGroupId()),
		}
		if result != "" {
			reportStatusReq.Result = proto.String(result)
		}
		actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
			Msg: reportStatusReq,
		}
		logger.Infof("report jobInstance=%d, status=%s to AtLeastDeliveryRoutingActor", jobInstanceId, instanceStatus.Descriptor())
	}
	return nil
}