odps/sql_task.go (91 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package odps import ( "encoding/csv" "encoding/json" "encoding/xml" "strings" "github.com/pkg/errors" "github.com/aliyun/aliyun-odps-go-sdk/odps/options" "github.com/aliyun/aliyun-odps-go-sdk/odps/common" ) type SQLTask struct { XMLName xml.Name `xml:"SQL"` TaskName `xml:"Name"` TaskConfig Query string } func NewAnonymousSQLTask(query string, hints map[string]string) SQLTask { return NewSqlTask("AnonymousSQLTask", query, hints) } func NewSqlTask(name string, query string, hints map[string]string) SQLTask { sqlTask := SQLTask{ TaskName: TaskName(name), Query: query, } sqlTask.Config = append(sqlTask.Config, common.Property{Name: "type", Value: "sql"}) if hints != nil { hintsJson, _ := json.Marshal(hints) sqlTask.Config = append(sqlTask.Config, common.Property{Name: "settings", Value: string(hintsJson)}) } return sqlTask } // NewSQLTaskWithOptions Create a SQLTask with options.SQLTaskOptions func NewSQLTaskWithOptions(query string, option *options.SQLTaskOptions) SQLTask { if option == nil { return NewAnonymousSQLTask(query, nil) } taskName := option.TaskName if taskName == "" { taskName = "AnonymousSQLTask" } sqlTask := SQLTask{ TaskName: TaskName(taskName), Query: query, } taskType := option.Type if taskType == "" { taskType = "sql" } sqlTask.Config = append(sqlTask.Config, common.Property{Name: "type", Value: taskType}) hints := option.Hints if hints == nil && option.DefaultSchema != "" { hints = make(map[string]string) hints["odps.default.schema"] = option.DefaultSchema } if hints != nil { if hints["odps.default.schema"] == "" && option.DefaultSchema != "" { hints["odps.default.schema"] = option.DefaultSchema } hintsJSON, _ := json.Marshal(hints) sqlTask.Config = append(sqlTask.Config, common.Property{Name: "settings", Value: string(hintsJSON)}) } if option.Aliases != nil { aliasJSON, _ := json.Marshal(option.Aliases) sqlTask.Config = append(sqlTask.Config, common.Property{Name: "aliases", Value: string(aliasJSON)}) } return sqlTask } func (t *SQLTask) TaskType() string { return "SQL" } func (t *SQLTask) RunInOdps(odpsIns *Odps, projectName string) (*Instance, error) { return t.Run(odpsIns, projectName) } func (t *SQLTask) Run(odpsIns *Odps, projectName string) (*Instance, error) { Instances := NewInstances(odpsIns) i, err := Instances.CreateTask(projectName, t) return i, errors.WithStack(err) } // GetSelectResultAsCsv 最多返回1W条数据 func (t *SQLTask) GetSelectResultAsCsv(i *Instance, withColumnName bool) (*csv.Reader, error) { results, err := i.GetResult() if err != nil { return nil, errors.WithStack(err) } if len(results) <= 0 { return nil, errors.Errorf("failed to get result from instance %s", i.Id()) } reader := csv.NewReader(strings.NewReader(results[0].Content())) if !withColumnName { _, _ = reader.Read() } return reader, nil }