func()

in internal/master/sharding_task_master.go [57:109]


func (m *ShardingTaskMaster) SubmitInstance(ctx context.Context, jobInstanceInfo *common.JobInstanceInfo) error {
	if err := m.parseShardingParameters(jobInstanceInfo); err != nil {
		m.UpdateNewInstanceStatus(m.GetSerialNum(), processor.InstanceStatusFailed, err.Error())
		return err
	}
	shardingNum := len(m.parameters)
	startContainerRequests := make([]*schedulerx.MasterStartContainerRequest, 0, shardingNum)
	for _, param := range m.parameters {
		tokens := strings.Split(param, "=")
		if len(tokens) != 2 {
			errMsg := fmt.Sprintf("invalid sharding parameters, should be like 0=a,1=b,2=c")
			m.UpdateNewInstanceStatus(m.GetSerialNum(), processor.InstanceStatusFailed, errMsg)
			return fmt.Errorf(errMsg)
		}
		shardingId, err := strconv.Atoi(tokens[0])
		if err != nil {
			errMsg := fmt.Sprintf("invalid sharding parameters, shardingId is not digit, shardingId=%s", tokens[0])
			m.UpdateNewInstanceStatus(m.GetSerialNum(), processor.InstanceStatusFailed, errMsg)
			return fmt.Errorf(errMsg)
		}
		taskName := tokens[0] // taskName == shardingId
		shardingParameter := tokens[1]

		if _, ok := m.taskProgressMap.Load(taskName); ok {
			errMsg := fmt.Sprintf("shardingId=%s is duplicated", taskName)
			m.UpdateNewInstanceStatus(m.GetSerialNum(), processor.InstanceStatusFailed, errMsg)
			return fmt.Errorf(errMsg)
		}

		task := common.NewShardingTask(int64(shardingId), shardingParameter)
		taskObj, err := json.Marshal(task)
		if err != nil {
			errMsg := fmt.Sprintf("json marshal task failed, err=%s, task=%+v", err.Error(), task)
			m.UpdateNewInstanceStatus(m.GetSerialNum(), processor.InstanceStatusFailed, errMsg)
			return fmt.Errorf(errMsg)
		}
		req, err := m.convert2StartContainerRequest(jobInstanceInfo, int64(shardingId), taskName, taskObj, false)
		if err != nil {
			errMsg := fmt.Sprintf("convert2StartContainerRequest failed, err=%s, jobInstanceInfo=%+v", err.Error(), jobInstanceInfo)
			m.UpdateNewInstanceStatus(m.GetSerialNum(), processor.InstanceStatusFailed, errMsg)
			return fmt.Errorf(errMsg)
		}
		req.ShardingNum = proto.Int32(int32(shardingNum))

		startContainerRequests = append(startContainerRequests, req)
	}

	m.startBatchHandler()
	m.BatchDispatchTasks(startContainerRequests, "")
	m.init()

	return nil
}