func()

in odps/instances.go [68:208]


func (instances *Instances) CreateTask(projectName string, task Task, createInstanceOptions ...*options.CreateInstanceOptions) (*Instance, error) {
	var instanceOptions *options.CreateInstanceOptions
	if len(createInstanceOptions) != 0 {
		instanceOptions = createInstanceOptions[0]
	}
	if instanceOptions == nil {
		instanceOptions = options.NewCreateInstanceOptions()
	}
	if projectName == "" {
		projectName = instances.projectName
	}
	jobPriority := instanceOptions.Priority
	if jobPriority == 0 {
		jobPriority = DefaultJobPriority
	}

	uuidStr := uuid.New().String()
	task.AddProperty("uuid", uuidStr)

	// The order of each field is strictly ordered
	type InstanceCreationModel struct {
		XMLName xml.Name `xml:"Instance"`
		Job     struct {
			Name             string `xml:"Name,omitempty"`
			Priority         int
			UniqueIdentifyID string `xml:"Guid,omitempty"`
			Tasks            Task   `xml:"Tasks>Task"`
		}
	}

	instanceCreationModel := InstanceCreationModel{
		Job: struct {
			Name             string `xml:"Name,omitempty"`
			Priority         int
			UniqueIdentifyID string `xml:"Guid,omitempty"`

			Tasks Task `xml:"Tasks>Task"`
		}{
			Name:             instanceOptions.JobName,
			Priority:         jobPriority,
			UniqueIdentifyID: instanceOptions.UniqueIdentifyID,
			Tasks:            task,
		},
	}

	type ResModel struct {
		XMLName xml.Name     `xml:"Instance"`
		Tasks   []TaskResult `xml:"Tasks>Task"`
	}
	var resModel ResModel

	client := instances.odpsIns.restClient
	rb := common.ResourceBuilder{}
	rb.SetProject(projectName)
	resource := rb.Instances()

	queryArg := make(url.Values)
	if instanceOptions.TryWait {
		queryArg.Set("tryWait", "")
	}
	var instanceId string

	headers := make(map[string]string)

	maxqaOptions := instanceOptions.MaxQAOptions
	if maxqaOptions.UseMaxQA {
		if maxqaOptions.SessionID == "" && maxqaOptions.QuotaName == "" {
			return nil, errors.New("The MaxQA job must provide a SessionID or QuotaName.")
		}
		if maxqaOptions.SessionID == "" {
			id, err := getMaxQASessionID(instances.odpsIns, maxqaOptions.QuotaName, projectName)
			if err != nil {
				return nil, err
			}
			maxqaOptions.SessionID = id
		}
		resource = "/mcqa" + resource
		headers[common.HttpHeaderMaxQASessionID] = maxqaOptions.SessionID
	}
	var maxqaQueryCookie string

	startTime := time.Now()
	maxRetryDuration := 180 * time.Second

	// 循环,直到达到最大重试时间
	for {
		err := client.DoXmlWithParseFunc(common.HttpMethod.PostMethod, resource, queryArg, headers, &instanceCreationModel, func(res *http.Response) error {
			location := res.Header.Get(common.HttpHeaderLocation)

			if location == "" {
				return errors.New("invalid response, Location header required")
			}

			splitAt := strings.LastIndex(location, "/")
			if splitAt < 0 || splitAt == len(location)-1 {
				return errors.New("invalid response, value of Location header is invalid")
			}

			instanceId = location[splitAt+1:]
			if res.StatusCode == 409 {
				return restclient.NewHttpNotOk(res)
			}
			if res.StatusCode == 201 {
				maxqaQueryCookie = res.Header.Get(common.HttpHeaderMaxQAQueryCookie)
				return nil
			}
			decoder := xml.NewDecoder(res.Body)
			return errors.WithStack(decoder.Decode(&resModel))
		})
		if err != nil {
			if time.Since(startTime) >= maxRetryDuration {
				return nil, err
			}
			var httpErr restclient.HttpError
			if errors.As(err, &httpErr) && httpErr.Response.StatusCode == 409 {
				retryAfter := httpErr.Response.Header.Get("Retry-After")
				if retryAfter != "" {
					retryAfterInt, ioErr := strconv.Atoi(retryAfter)
					if ioErr != nil {
						retryAfterInt = 5
					}
					time.Sleep(time.Second * time.Duration(retryAfterInt))
				} else {
					time.Sleep(time.Second * 5)
				}
				continue
			}
			return nil, err
		}
		instance := NewInstance(instances.odpsIns, projectName, instanceId)
		instance.taskNameCommitted = task.GetName()
		instance.taskResults = resModel.Tasks
		instance.isSync = resModel.Tasks != nil && len(resModel.Tasks) > 0
		if maxqaOptions.UseMaxQA {
			instance.MaxQA.isMaxQA = maxqaOptions.UseMaxQA
			instance.MaxQA.sessionID = maxqaOptions.SessionID
			instance.MaxQA.queryCookie = maxqaQueryCookie
		}
		return instance, nil
	}
}