in internal/master/sharding_task_master.go [57:109]
func (m *ShardingTaskMaster) SubmitInstance(ctx context.Context, jobInstanceInfo *common.JobInstanceInfo) error {
if err := m.parseShardingParameters(jobInstanceInfo); err != nil {
m.UpdateNewInstanceStatus(m.GetSerialNum(), processor.InstanceStatusFailed, err.Error())
return err
}
shardingNum := len(m.parameters)
startContainerRequests := make([]*schedulerx.MasterStartContainerRequest, 0, shardingNum)
for _, param := range m.parameters {
tokens := strings.Split(param, "=")
if len(tokens) != 2 {
errMsg := fmt.Sprintf("invalid sharding parameters, should be like 0=a,1=b,2=c")
m.UpdateNewInstanceStatus(m.GetSerialNum(), processor.InstanceStatusFailed, errMsg)
return fmt.Errorf(errMsg)
}
shardingId, err := strconv.Atoi(tokens[0])
if err != nil {
errMsg := fmt.Sprintf("invalid sharding parameters, shardingId is not digit, shardingId=%s", tokens[0])
m.UpdateNewInstanceStatus(m.GetSerialNum(), processor.InstanceStatusFailed, errMsg)
return fmt.Errorf(errMsg)
}
taskName := tokens[0] // taskName == shardingId
shardingParameter := tokens[1]
if _, ok := m.taskProgressMap.Load(taskName); ok {
errMsg := fmt.Sprintf("shardingId=%s is duplicated", taskName)
m.UpdateNewInstanceStatus(m.GetSerialNum(), processor.InstanceStatusFailed, errMsg)
return fmt.Errorf(errMsg)
}
task := common.NewShardingTask(int64(shardingId), shardingParameter)
taskObj, err := json.Marshal(task)
if err != nil {
errMsg := fmt.Sprintf("json marshal task failed, err=%s, task=%+v", err.Error(), task)
m.UpdateNewInstanceStatus(m.GetSerialNum(), processor.InstanceStatusFailed, errMsg)
return fmt.Errorf(errMsg)
}
req, err := m.convert2StartContainerRequest(jobInstanceInfo, int64(shardingId), taskName, taskObj, false)
if err != nil {
errMsg := fmt.Sprintf("convert2StartContainerRequest failed, err=%s, jobInstanceInfo=%+v", err.Error(), jobInstanceInfo)
m.UpdateNewInstanceStatus(m.GetSerialNum(), processor.InstanceStatusFailed, errMsg)
return fmt.Errorf(errMsg)
}
req.ShardingNum = proto.Int32(int32(shardingNum))
startContainerRequests = append(startContainerRequests, req)
}
m.startBatchHandler()
m.BatchDispatchTasks(startContainerRequests, "")
m.init()
return nil
}