func()

in internal/actor/job_instance_actor.go [117:189]


func (a *jobInstanceActor) handleSubmitJobInstance(actorCtx actor.Context, msg *actorcomm.SchedulerWrappedMsg) error {
	var (
		req            = msg.Msg.(*schedulerx.ServerSubmitJobInstanceRequest)
		taskMasterPool = masterpool.GetTaskMasterPool()
	)
	logger.Infof("handleSubmitJobInstance, jobInstanceId=%d, req=%+v", req.GetJobInstanceId(), req)
	if taskMasterPool.Contains(req.GetJobInstanceId()) {
		errMsg := fmt.Sprintf("jobInstanceId=%d is still running!", req.GetJobInstanceId())
		logger.Infof(errMsg)
		resp := &schedulerx.ServerSubmitJobInstanceResponse{
			Success: proto.Bool(false),
			Message: proto.String(errMsg),
		}
		actorCtx.Send(actorcomm.SchedulerxServerPid(msg.Ctx), actorcomm.WrapSchedulerxMsg(msg.Ctx, resp, msg.SenderPath))
	} else {
		resp := &schedulerx.ServerSubmitJobInstanceResponse{
			Success: proto.Bool(true),
		}
		actorCtx.Send(actorcomm.SchedulerxServerPid(msg.Ctx), actorcomm.WrapSchedulerxMsg(msg.Ctx, resp, msg.SenderPath))
		jobInstanceInfo := convert2JobInstanceInfo(req)

		// check job is registered
		jobName := gjson.Get(jobInstanceInfo.GetContent(), "jobName").String()
		// Compatible with the existing Java language configuration mechanism
		if jobInstanceInfo.GetJobType() == "java" {
			jobName = gjson.Get(jobInstanceInfo.GetContent(), "className").String()
		}
		task, ok := masterpool.GetTaskMasterPool().Tasks().Find(jobName)
		if !ok || task == nil {
			logger.Errorf("handleSubmitJobInstance error, jobName=%s is unregistered. ", jobName)

			// report job instance status with at-least-once-delivery
			req := &schedulerx.WorkerReportJobInstanceStatusRequest{
				JobId:         proto.Int64(jobInstanceInfo.GetJobId()),
				JobInstanceId: proto.Int64(jobInstanceInfo.GetJobInstanceId()),
				Status:        proto.Int32(int32(processor.InstanceStatusFailed)),
				DeliveryId:    proto.Int64(utils.GetDeliveryId()),
				GroupId:       proto.String(jobInstanceInfo.GetGroupId()),
				Result:        proto.String(fmt.Sprintf("jobName=%s is unregistered", jobName)),
			}
			actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
				Msg: req,
			}
		} else {
			var taskMaster taskmaster.TaskMaster
			switch common.ExecuteMode(jobInstanceInfo.GetExecuteMode()) {
			case common.StandaloneExecuteMode:
				taskMaster = master.NewStandaloneTaskMaster(jobInstanceInfo, actorCtx)
			case common.BroadcastExecuteMode:
				taskMaster = master.NewBroadcastTaskMaster(jobInstanceInfo, actorCtx)
			case common.BatchExecuteMode:
				taskMaster = master.NewBatchTaskMaster(jobInstanceInfo, actorCtx)
			case common.ParallelExecuteMode:
				taskMaster = master.NewParallelTaskMaster(jobInstanceInfo, actorCtx)
			case common.GridExecuteMode:
				taskMaster = master.NewGridTaskMaster(jobInstanceInfo, actorCtx)
			case common.ShardingExecuteMode:
				taskMaster = master.NewShardingTaskMaster(jobInstanceInfo, actorCtx)
			default:
				logger.Errorf("Submit jobInstanceId=%d failed, unknown executeMode=%s", jobInstanceInfo.GetExecuteMode())
			}

			if taskMaster != nil {
				masterpool.GetTaskMasterPool().Put(jobInstanceInfo.GetJobInstanceId(), taskMaster)
				if err := taskMaster.SubmitInstance(msg.Ctx, jobInstanceInfo); err != nil {
					return err
				}
				logger.Infof("Submit jobInstanceId=%d succeed", req.GetJobInstanceId())
			}
		}
	}
	return nil
}