in odps/instances.go [68:208]
func (instances *Instances) CreateTask(projectName string, task Task, createInstanceOptions ...*options.CreateInstanceOptions) (*Instance, error) {
var instanceOptions *options.CreateInstanceOptions
if len(createInstanceOptions) != 0 {
instanceOptions = createInstanceOptions[0]
}
if instanceOptions == nil {
instanceOptions = options.NewCreateInstanceOptions()
}
if projectName == "" {
projectName = instances.projectName
}
jobPriority := instanceOptions.Priority
if jobPriority == 0 {
jobPriority = DefaultJobPriority
}
uuidStr := uuid.New().String()
task.AddProperty("uuid", uuidStr)
// The order of each field is strictly ordered
type InstanceCreationModel struct {
XMLName xml.Name `xml:"Instance"`
Job struct {
Name string `xml:"Name,omitempty"`
Priority int
UniqueIdentifyID string `xml:"Guid,omitempty"`
Tasks Task `xml:"Tasks>Task"`
}
}
instanceCreationModel := InstanceCreationModel{
Job: struct {
Name string `xml:"Name,omitempty"`
Priority int
UniqueIdentifyID string `xml:"Guid,omitempty"`
Tasks Task `xml:"Tasks>Task"`
}{
Name: instanceOptions.JobName,
Priority: jobPriority,
UniqueIdentifyID: instanceOptions.UniqueIdentifyID,
Tasks: task,
},
}
type ResModel struct {
XMLName xml.Name `xml:"Instance"`
Tasks []TaskResult `xml:"Tasks>Task"`
}
var resModel ResModel
client := instances.odpsIns.restClient
rb := common.ResourceBuilder{}
rb.SetProject(projectName)
resource := rb.Instances()
queryArg := make(url.Values)
if instanceOptions.TryWait {
queryArg.Set("tryWait", "")
}
var instanceId string
headers := make(map[string]string)
maxqaOptions := instanceOptions.MaxQAOptions
if maxqaOptions.UseMaxQA {
if maxqaOptions.SessionID == "" && maxqaOptions.QuotaName == "" {
return nil, errors.New("The MaxQA job must provide a SessionID or QuotaName.")
}
if maxqaOptions.SessionID == "" {
id, err := getMaxQASessionID(instances.odpsIns, maxqaOptions.QuotaName, projectName)
if err != nil {
return nil, err
}
maxqaOptions.SessionID = id
}
resource = "/mcqa" + resource
headers[common.HttpHeaderMaxQASessionID] = maxqaOptions.SessionID
}
var maxqaQueryCookie string
startTime := time.Now()
maxRetryDuration := 180 * time.Second
// 循环,直到达到最大重试时间
for {
err := client.DoXmlWithParseFunc(common.HttpMethod.PostMethod, resource, queryArg, headers, &instanceCreationModel, func(res *http.Response) error {
location := res.Header.Get(common.HttpHeaderLocation)
if location == "" {
return errors.New("invalid response, Location header required")
}
splitAt := strings.LastIndex(location, "/")
if splitAt < 0 || splitAt == len(location)-1 {
return errors.New("invalid response, value of Location header is invalid")
}
instanceId = location[splitAt+1:]
if res.StatusCode == 409 {
return restclient.NewHttpNotOk(res)
}
if res.StatusCode == 201 {
maxqaQueryCookie = res.Header.Get(common.HttpHeaderMaxQAQueryCookie)
return nil
}
decoder := xml.NewDecoder(res.Body)
return errors.WithStack(decoder.Decode(&resModel))
})
if err != nil {
if time.Since(startTime) >= maxRetryDuration {
return nil, err
}
var httpErr restclient.HttpError
if errors.As(err, &httpErr) && httpErr.Response.StatusCode == 409 {
retryAfter := httpErr.Response.Header.Get("Retry-After")
if retryAfter != "" {
retryAfterInt, ioErr := strconv.Atoi(retryAfter)
if ioErr != nil {
retryAfterInt = 5
}
time.Sleep(time.Second * time.Duration(retryAfterInt))
} else {
time.Sleep(time.Second * 5)
}
continue
}
return nil, err
}
instance := NewInstance(instances.odpsIns, projectName, instanceId)
instance.taskNameCommitted = task.GetName()
instance.taskResults = resModel.Tasks
instance.isSync = resModel.Tasks != nil && len(resModel.Tasks) > 0
if maxqaOptions.UseMaxQA {
instance.MaxQA.isMaxQA = maxqaOptions.UseMaxQA
instance.MaxQA.sessionID = maxqaOptions.SessionID
instance.MaxQA.queryCookie = maxqaQueryCookie
}
return instance, nil
}
}