odps/instances.go (297 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package odps import ( "encoding/xml" "fmt" "net/http" "net/url" "strconv" "strings" "time" "github.com/google/uuid" "github.com/pkg/errors" "github.com/aliyun/aliyun-odps-go-sdk/odps/restclient" "github.com/aliyun/aliyun-odps-go-sdk/odps/common" "github.com/aliyun/aliyun-odps-go-sdk/odps/options" ) // Instances is used to get or create instance(s) type Instances struct { projectName string odpsIns *Odps } // NewInstances create Instances object, if the projectName is not set, // the default project name of odpsIns will be used func NewInstances(odpsIns *Odps, projectName ...string) *Instances { var _projectName string if len(projectName) == 0 { _projectName = odpsIns.DefaultProjectName() } else { _projectName = projectName[0] } return &Instances{ projectName: _projectName, odpsIns: odpsIns, } } // CreateTaskWithPriority Create a Task (maybe SQLTask) with specified priority func (instances *Instances) CreateTaskWithPriority(projectName string, task Task, jobPriority int) (*Instance, error) { instanceOptions := options.NewCreateInstanceOptions() instanceOptions.Priority = jobPriority return instances.CreateTask(projectName, task, instanceOptions) } // CreateTask Create a Task (maybe SQLTask) with options.CreateInstanceOptions func (instances *Instances) CreateTask(projectName string, task Task, createInstanceOptions ...*options.CreateInstanceOptions) (*Instance, error) { var instanceOptions *options.CreateInstanceOptions if len(createInstanceOptions) != 0 { instanceOptions = createInstanceOptions[0] } if instanceOptions == nil { instanceOptions = options.NewCreateInstanceOptions() } if projectName == "" { projectName = instances.projectName } jobPriority := instanceOptions.Priority if jobPriority == 0 { jobPriority = DefaultJobPriority } uuidStr := uuid.New().String() task.AddProperty("uuid", uuidStr) // The order of each field is strictly ordered type InstanceCreationModel struct { XMLName xml.Name `xml:"Instance"` Job struct { Name string `xml:"Name,omitempty"` Priority int UniqueIdentifyID string `xml:"Guid,omitempty"` Tasks Task `xml:"Tasks>Task"` } } instanceCreationModel := InstanceCreationModel{ Job: struct { Name string `xml:"Name,omitempty"` Priority int UniqueIdentifyID string `xml:"Guid,omitempty"` Tasks Task `xml:"Tasks>Task"` }{ Name: instanceOptions.JobName, Priority: jobPriority, UniqueIdentifyID: instanceOptions.UniqueIdentifyID, Tasks: task, }, } type ResModel struct { XMLName xml.Name `xml:"Instance"` Tasks []TaskResult `xml:"Tasks>Task"` } var resModel ResModel client := instances.odpsIns.restClient rb := common.ResourceBuilder{} rb.SetProject(projectName) resource := rb.Instances() queryArg := make(url.Values) if instanceOptions.TryWait { queryArg.Set("tryWait", "") } var instanceId string headers := make(map[string]string) maxqaOptions := instanceOptions.MaxQAOptions if maxqaOptions.UseMaxQA { if maxqaOptions.SessionID == "" && maxqaOptions.QuotaName == "" { return nil, errors.New("The MaxQA job must provide a SessionID or QuotaName.") } if maxqaOptions.SessionID == "" { id, err := getMaxQASessionID(instances.odpsIns, maxqaOptions.QuotaName, projectName) if err != nil { return nil, err } maxqaOptions.SessionID = id } resource = "/mcqa" + resource headers[common.HttpHeaderMaxQASessionID] = maxqaOptions.SessionID } var maxqaQueryCookie string startTime := time.Now() maxRetryDuration := 180 * time.Second // 循环,直到达到最大重试时间 for { err := client.DoXmlWithParseFunc(common.HttpMethod.PostMethod, resource, queryArg, headers, &instanceCreationModel, func(res *http.Response) error { location := res.Header.Get(common.HttpHeaderLocation) if location == "" { return errors.New("invalid response, Location header required") } splitAt := strings.LastIndex(location, "/") if splitAt < 0 || splitAt == len(location)-1 { return errors.New("invalid response, value of Location header is invalid") } instanceId = location[splitAt+1:] if res.StatusCode == 409 { return restclient.NewHttpNotOk(res) } if res.StatusCode == 201 { maxqaQueryCookie = res.Header.Get(common.HttpHeaderMaxQAQueryCookie) return nil } decoder := xml.NewDecoder(res.Body) return errors.WithStack(decoder.Decode(&resModel)) }) if err != nil { if time.Since(startTime) >= maxRetryDuration { return nil, err } var httpErr restclient.HttpError if errors.As(err, &httpErr) && httpErr.Response.StatusCode == 409 { retryAfter := httpErr.Response.Header.Get("Retry-After") if retryAfter != "" { retryAfterInt, ioErr := strconv.Atoi(retryAfter) if ioErr != nil { retryAfterInt = 5 } time.Sleep(time.Second * time.Duration(retryAfterInt)) } else { time.Sleep(time.Second * 5) } continue } return nil, err } instance := NewInstance(instances.odpsIns, projectName, instanceId) instance.taskNameCommitted = task.GetName() instance.taskResults = resModel.Tasks instance.isSync = resModel.Tasks != nil && len(resModel.Tasks) > 0 if maxqaOptions.UseMaxQA { instance.MaxQA.isMaxQA = maxqaOptions.UseMaxQA instance.MaxQA.sessionID = maxqaOptions.SessionID instance.MaxQA.queryCookie = maxqaQueryCookie } return instance, nil } } // List Get all instances, the filters can be given with InstanceFilter.Status, InstanceFilter.OnlyOwner, // InstanceFilter.QuotaIndex, InstanceFilter.TimeRange func (instances *Instances) List(f func(*Instance), filters ...InsFilterFunc) error { queryArgs := make(url.Values) queryArgs.Set("onlyowner", "no") for _, filter := range filters { filter(queryArgs) } client := instances.odpsIns.restClient rb := common.ResourceBuilder{ProjectName: instances.projectName} resources := rb.Instances() type ResModel struct { XMLName xml.Name `xml:"Instances"` Marker string MaxItems int Instances []struct { Name string Owner string StartTime common.GMTTime EndTime common.GMTTime `xml:"EndTime"` Status InstanceStatus } `xml:"Instance"` } var resModel ResModel for { err := client.GetWithModel(resources, queryArgs, nil, &resModel) if err != nil { return err } for _, model := range resModel.Instances { instance := NewInstance(instances.odpsIns, instances.projectName, model.Name) instance.startTime = time.Time(model.StartTime) instance.endTime = time.Time(model.EndTime) instance.status = model.Status instance.owner = model.Owner f(instance) } if resModel.Marker != "" { queryArgs.Set("marker", resModel.Marker) resModel = ResModel{} } else { break } } return nil } // ListInstancesQueued Get all instance Queued information, the information is in json string,you need parse it yourself。 // The filters can be given with InstanceFilter.Status, InstanceFilter.OnlyOwner, InstanceFilter.QuotaIndex, // InstanceFilter.TimeRange func (instances *Instances) ListInstancesQueued(filters ...InsFilterFunc) ([]string, error) { queryArgs := make(url.Values) queryArgs.Set("onlyowner", "no") for _, filter := range filters { filter(queryArgs) } client := instances.odpsIns.restClient rb := common.ResourceBuilder{ProjectName: instances.projectName} resources := rb.CachedInstances() type ResModel struct { XMLName xml.Name `xml:"Instances"` Marker string MaxItems int Content string } var resModel ResModel var insList []string for { err := client.GetWithModel(resources, queryArgs, nil, &resModel) if err != nil { return insList, errors.WithStack(err) } if resModel.Content == "" { break } insList = append(insList, resModel.Content) if resModel.Marker != "" { queryArgs.Set("marker", resModel.Marker) resModel = ResModel{} } else { break } } return insList, nil } type InsFilterFunc func(values url.Values) var InstanceFilter = struct { // Only get instances with a given status Status func(InstanceStatus) InsFilterFunc // Only get instances that create by the current account OnlyOwner func() InsFilterFunc // Instance 运行所在 quota 组过滤条件 QuotaIndex func(string) InsFilterFunc // Get instances running between start and end times TimeRange func(time.Time, time.Time) InsFilterFunc }{ Status: func(status InstanceStatus) InsFilterFunc { return func(values url.Values) { if status != 0 { values.Set("status", status.String()) } } }, OnlyOwner: func() InsFilterFunc { return func(values url.Values) { values.Set("onlyowner", "yes") } }, QuotaIndex: func(s string) InsFilterFunc { return func(values url.Values) { values.Set("quotaindex", s) } }, TimeRange: func(s time.Time, e time.Time) InsFilterFunc { return func(values url.Values) { startTime := strconv.FormatInt(s.Unix(), 10) endTime := strconv.FormatInt(e.Unix(), 10) dateRange := fmt.Sprintf("%s:%s", startTime, endTime) values.Set("daterange", dateRange) } }, } func (instances *Instances) Get(instanceId string) *Instance { return NewInstance(instances.odpsIns, instances.projectName, instanceId) } func getMaxQASessionID(ins *Odps, quotaName string, projectName string) (string, error) { tenantId := ins.Project(projectName).TenantId() resource := "/quotas/" + quotaName queryArgs := make(url.Values, 4) queryArgs.Set("project", projectName) queryArgs.Set("version", "wlm") queryArgs.Set("tenant", tenantId) request, err := ins.restClient.NewRequestWithUrlQuery(common.HttpMethod.GetMethod, resource, nil, queryArgs) if err != nil { return "", err } response, err := ins.restClient.Do(request) if err != nil { return "", err } if response.StatusCode/100 != 2 { return "", restclient.NewHttpNotOk(response) } maxqaSessionId := response.Header.Get(common.HttpHeaderMaxQASessionID) return maxqaSessionId, nil }