odps/instance.go (362 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package odps import ( "encoding/json" "encoding/xml" "io/ioutil" "net/http" "net/url" "strings" "time" "github.com/pkg/errors" "github.com/aliyun/aliyun-odps-go-sdk/odps/common" ) type InstanceStatus int const ( _ InstanceStatus = iota InstanceRunning InstanceSuspended InstanceTerminated InstanceStatusUnknown ) const DefaultJobPriority = 9 type Instance struct { projectName string id string owner string startTime time.Time endTime time.Time status InstanceStatus odpsIns *Odps reLoaded bool resourceUrl string taskNameCommitted string taskResults []TaskResult isSync bool MaxQA struct { isMaxQA bool queryCookie string sessionID string } } // InstanceOrErr is used for the return value of Instances.List type InstanceOrErr struct { Ins *Instance Err error } func NewInstance(odpsIns *Odps, projectName, instanceId string) *Instance { rb := common.ResourceBuilder{ProjectName: projectName} return &Instance{ id: instanceId, projectName: projectName, odpsIns: odpsIns, resourceUrl: rb.Instance(instanceId), } } func (instance *Instance) TaskNameCommitted() string { return instance.taskNameCommitted } func (instance *Instance) ProjectName() string { return instance.projectName } func (instance *Instance) IsLoaded() bool { return instance.reLoaded } func (instance *Instance) IsSync() bool { return instance.isSync } func (instance *Instance) IsAsync() bool { return !instance.isSync } func (instance *Instance) Load() error { client := instance.odpsIns.restClient type ResModel struct { XMLName xml.Name `xml:"Instance"` Status InstanceStatus Results []TaskResult `xml:"Tasks>Task"` } var resModel ResModel resource := instance.resourceUrl headers := make(map[string]string) queryArgs := make(url.Values) if instance.MaxQA.isMaxQA { resource = "/mcqa" + resource headers[common.HttpHeaderMaxQASessionID] = instance.MaxQA.sessionID if instance.MaxQA.queryCookie != "" { headers[common.HttpHeaderMaxQAQueryCookie] = instance.MaxQA.queryCookie } queryArgs.Set("instancestatus", "") } err := client.GetWithParseFunc(resource, queryArgs, headers, func(res *http.Response) error { header := res.Header instance.owner = header.Get(common.HttpHeaderOdpsOwner) instance.startTime, _ = common.ParseRFC1123Date(header.Get(common.HttpHeaderOdpsStartTime)) instance.endTime, _ = common.ParseRFC1123Date(header.Get(common.HttpHeaderOdpsEndTime)) decoder := xml.NewDecoder(res.Body) if err := decoder.Decode(&resModel); err != nil { return errors.WithStack(err) } instance.status = resModel.Status if resModel.Results != nil { instance.taskResults = resModel.Results instance.isSync = true } return nil }) return errors.WithStack(err) } func (instance *Instance) Terminate() error { type BodyModel struct { XMLName xml.Name `xml:"Instance"` Status InstanceStatus } bodyModel := BodyModel{ Status: InstanceTerminated, } resource := instance.resourceUrl headers := make(map[string]string) if instance.MaxQA.isMaxQA { resource = "/mcqa" + resource headers[common.HttpHeaderMaxQASessionID] = instance.MaxQA.sessionID if instance.MaxQA.queryCookie != "" { headers[common.HttpHeaderMaxQAQueryCookie] = instance.MaxQA.queryCookie } } client := instance.odpsIns.restClient err := client.DoXmlWithParseFunc("PUT", resource, nil, headers, &bodyModel, nil) return errors.WithStack(err) } // GetTasks 绝大部分时候返回一个Task(名字与提交的task名字相同),返回多个task的情况我还没有遇到过 func (instance *Instance) GetTasks() ([]TaskInInstance, error) { urlQuery := make(url.Values) urlQuery.Set("taskstatus", "") type ResModel struct { XMLName xml.Name `xml:"Instance"` Tasks []TaskInInstance `xml:"Tasks>Task"` } var resModel ResModel client := instance.odpsIns.restClient err := client.GetWithModel(instance.resourceUrl, urlQuery, nil, &resModel) if err != nil { return nil, errors.WithStack(err) } return resModel.Tasks, nil } func (instance *Instance) GetTaskProgress(taskName string) ([]TaskProgressStage, error) { queryArgs := make(url.Values) queryArgs.Set("instanceprogress", "") queryArgs.Set("taskname", taskName) client := instance.odpsIns.restClient type ResModel struct { XMLName xml.Name `xml:"Progress"` Stages []TaskProgressStage `xml:"Stage"` } var resModel ResModel resource := instance.resourceUrl headers := make(map[string]string) if instance.MaxQA.isMaxQA { resource = "/mcqa" + resource headers[common.HttpHeaderMaxQASessionID] = instance.MaxQA.sessionID if instance.MaxQA.queryCookie != "" { headers[common.HttpHeaderMaxQAQueryCookie] = instance.MaxQA.queryCookie } } err := client.GetWithModel(resource, queryArgs, headers, &resModel) if err != nil { return nil, errors.WithStack(err) } return resModel.Stages, nil } func (instance *Instance) GetTaskDetail(taskName string) ([]byte, error) { queryArgs := make(url.Values, 2) queryArgs.Set("instancedetail", "") queryArgs.Set("taskname", taskName) client := instance.odpsIns.restClient var body []byte resource := instance.resourceUrl headers := make(map[string]string) if instance.MaxQA.isMaxQA { resource = "/mcqa" + resource headers[common.HttpHeaderMaxQASessionID] = instance.MaxQA.sessionID if instance.MaxQA.queryCookie != "" { headers[common.HttpHeaderMaxQAQueryCookie] = instance.MaxQA.queryCookie } } err := client.GetWithParseFunc(resource, queryArgs, headers, func(res *http.Response) error { var err error // Use ioutil.ReadAll instead of io.ReadAll for compatibility with Go 1.15. body, err = ioutil.ReadAll(res.Body) return errors.WithStack(err) }) return body, errors.WithStack(err) } func (instance *Instance) GetTaskSummary(taskName string) (*TaskSummary, error) { queryArgs := make(url.Values, 2) queryArgs.Set("instancesummary", "") queryArgs.Set("taskname", taskName) client := instance.odpsIns.restClient type ResModel struct { Instance struct { JsonSummary string Summary string } } var resModel ResModel err := client.GetWithParseFunc(instance.resourceUrl, queryArgs, nil, func(res *http.Response) error { decoder := json.NewDecoder(res.Body) return errors.WithStack(decoder.Decode(&resModel)) }) if err != nil { return nil, errors.WithStack(err) } taskSummary := TaskSummary{ JsonSummary: resModel.Instance.JsonSummary, Summary: resModel.Instance.Summary, } return &taskSummary, errors.WithStack(err) } func (instance *Instance) GetTaskQuotaJson(taskName string) (string, error) { queryArgs := make(url.Values, 2) queryArgs.Set("instancequota", "") queryArgs.Set("taskname", taskName) client := instance.odpsIns.restClient var body []byte err := client.GetWithParseFunc(instance.resourceUrl, queryArgs, nil, func(res *http.Response) error { var err error // Use ioutil.ReadAll instead of io.ReadAll for compatibility with Go 1.15. body, err = ioutil.ReadAll(res.Body) return errors.WithStack(err) }) if err != nil { return "", errors.WithStack(err) } return string(body), nil } // GetCachedInfo 获取instance cached信息,返回的是json字符串,需要自己进行解析 func (instance *Instance) GetCachedInfo() (string, error) { queryArgs := make(url.Values, 2) queryArgs.Set("cached", "") client := instance.odpsIns.restClient var body []byte err := client.GetWithParseFunc(instance.resourceUrl, queryArgs, nil, func(res *http.Response) error { var err error // Use ioutil.ReadAll instead of io.ReadAll for compatibility with Go 1.15. body, err = ioutil.ReadAll(res.Body) return errors.WithStack(err) }) if err != nil { return "", errors.WithStack(err) } return string(body), nil } func (instance *Instance) Id() string { return instance.id } func (instance *Instance) Owner() string { return instance.owner } func (instance *Instance) Status() InstanceStatus { return instance.status } func (instance *Instance) StartTime() time.Time { return instance.startTime } func (instance *Instance) EndTime() time.Time { return instance.endTime } func (instance *Instance) TaskResults() []TaskResult { return instance.taskResults } func (instance *Instance) WaitForSuccess() error { for { err := instance.Load() if err != nil { return errors.WithStack(err) } tasks, err := instance.GetTasks() if err != nil { return errors.WithStack(err) } if len(tasks) == 0 { time.Sleep(time.Second * 1) continue } success := true for _, task := range tasks { switch task.Status { case TaskFailed, TaskCancelled, TaskSuspended: results, err := instance.GetResult() if err != nil { return errors.Wrapf(err, "get task %s with status %s", task.Name, task.Status) } if len(results) <= 0 { return errors.Errorf("get task %s with status %s", task.Name, task.Status) } return errors.New(results[0].Content()) case TaskSuccess: continue case TaskRunning, TaskWaiting: success = false } } if success { break } time.Sleep(time.Second * 1) } return nil } func (instance *Instance) GetResult() ([]TaskResult, error) { if instance.isSync { return instance.taskResults, nil } if instance.MaxQA.isMaxQA { err := instance.Load() if err != nil { return nil, err } return instance.taskResults, nil } queryArgs := make(url.Values, 1) queryArgs.Set("result", "") client := instance.odpsIns.restClient type ResModel struct { XMLName xml.Name `xml:"Instance"` Tasks []TaskResult `xml:"Tasks>Task"` } var resModel ResModel err := client.GetWithModel(instance.resourceUrl, queryArgs, nil, &resModel) if err != nil { return nil, errors.WithStack(err) } return resModel.Tasks, nil } func InstancesStatusFromStr(s string) InstanceStatus { switch strings.ToLower(s) { case "running": return InstanceRunning case "suspended": return InstanceSuspended case "terminated": return InstanceTerminated default: return InstanceStatusUnknown } } func (status InstanceStatus) String() string { switch status { case InstanceRunning: return "Running" case InstanceSuspended: return "Suspended" case InstanceTerminated: return "Terminated" default: return "InstanceStatusUnknown" } } func (status *InstanceStatus) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error { var s string if err := d.DecodeElement(&s, &start); err != nil { return errors.WithStack(err) } *status = InstancesStatusFromStr(s) return nil } func (status *InstanceStatus) MarshalXML(d *xml.Encoder, start xml.StartElement) error { s := status.String() return errors.WithStack(d.EncodeElement(s, start)) }