in internal/actor/job_instance_actor.go [117:189]
func (a *jobInstanceActor) handleSubmitJobInstance(actorCtx actor.Context, msg *actorcomm.SchedulerWrappedMsg) error {
var (
req = msg.Msg.(*schedulerx.ServerSubmitJobInstanceRequest)
taskMasterPool = masterpool.GetTaskMasterPool()
)
logger.Infof("handleSubmitJobInstance, jobInstanceId=%d, req=%+v", req.GetJobInstanceId(), req)
if taskMasterPool.Contains(req.GetJobInstanceId()) {
errMsg := fmt.Sprintf("jobInstanceId=%d is still running!", req.GetJobInstanceId())
logger.Infof(errMsg)
resp := &schedulerx.ServerSubmitJobInstanceResponse{
Success: proto.Bool(false),
Message: proto.String(errMsg),
}
actorCtx.Send(actorcomm.SchedulerxServerPid(msg.Ctx), actorcomm.WrapSchedulerxMsg(msg.Ctx, resp, msg.SenderPath))
} else {
resp := &schedulerx.ServerSubmitJobInstanceResponse{
Success: proto.Bool(true),
}
actorCtx.Send(actorcomm.SchedulerxServerPid(msg.Ctx), actorcomm.WrapSchedulerxMsg(msg.Ctx, resp, msg.SenderPath))
jobInstanceInfo := convert2JobInstanceInfo(req)
// check job is registered
jobName := gjson.Get(jobInstanceInfo.GetContent(), "jobName").String()
// Compatible with the existing Java language configuration mechanism
if jobInstanceInfo.GetJobType() == "java" {
jobName = gjson.Get(jobInstanceInfo.GetContent(), "className").String()
}
task, ok := masterpool.GetTaskMasterPool().Tasks().Find(jobName)
if !ok || task == nil {
logger.Errorf("handleSubmitJobInstance error, jobName=%s is unregistered. ", jobName)
// report job instance status with at-least-once-delivery
req := &schedulerx.WorkerReportJobInstanceStatusRequest{
JobId: proto.Int64(jobInstanceInfo.GetJobId()),
JobInstanceId: proto.Int64(jobInstanceInfo.GetJobInstanceId()),
Status: proto.Int32(int32(processor.InstanceStatusFailed)),
DeliveryId: proto.Int64(utils.GetDeliveryId()),
GroupId: proto.String(jobInstanceInfo.GetGroupId()),
Result: proto.String(fmt.Sprintf("jobName=%s is unregistered", jobName)),
}
actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
Msg: req,
}
} else {
var taskMaster taskmaster.TaskMaster
switch common.ExecuteMode(jobInstanceInfo.GetExecuteMode()) {
case common.StandaloneExecuteMode:
taskMaster = master.NewStandaloneTaskMaster(jobInstanceInfo, actorCtx)
case common.BroadcastExecuteMode:
taskMaster = master.NewBroadcastTaskMaster(jobInstanceInfo, actorCtx)
case common.BatchExecuteMode:
taskMaster = master.NewBatchTaskMaster(jobInstanceInfo, actorCtx)
case common.ParallelExecuteMode:
taskMaster = master.NewParallelTaskMaster(jobInstanceInfo, actorCtx)
case common.GridExecuteMode:
taskMaster = master.NewGridTaskMaster(jobInstanceInfo, actorCtx)
case common.ShardingExecuteMode:
taskMaster = master.NewShardingTaskMaster(jobInstanceInfo, actorCtx)
default:
logger.Errorf("Submit jobInstanceId=%d failed, unknown executeMode=%s", jobInstanceInfo.GetExecuteMode())
}
if taskMaster != nil {
masterpool.GetTaskMasterPool().Put(jobInstanceInfo.GetJobInstanceId(), taskMaster)
if err := taskMaster.SubmitInstance(msg.Ctx, jobInstanceInfo); err != nil {
return err
}
logger.Infof("Submit jobInstanceId=%d succeed", req.GetJobInstanceId())
}
}
}
return nil
}