odps/tables.go (279 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package odps
import (
"encoding/json"
"encoding/xml"
"fmt"
"net/url"
"strings"
"github.com/pkg/errors"
"github.com/aliyun/aliyun-odps-go-sdk/odps/common"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
)
// Tables used for get all the tables in an odps project
type Tables struct {
projectName string
schemaName string
odpsIns *Odps
}
// NewTables if projectName is not set,the default projectName of odps will be used
func NewTables(odpsIns *Odps, projectName, schemaName string) *Tables {
if projectName == "" {
projectName = odpsIns.DefaultProjectName()
}
if schemaName == "" {
schemaName = odpsIns.CurrentSchemaName()
}
return &Tables{
projectName: projectName,
schemaName: schemaName,
odpsIns: odpsIns,
}
}
// List get all the tables, filters can be specified with TableFilter.NamePrefix,
// TableFilter.Extended, TableFilter.Owner
func (ts *Tables) List(f func(*Table, error), filters ...TFilterFunc) {
queryArgs := make(url.Values, 4)
queryArgs.Set("expectmarker", "true")
queryArgs.Set("curr_schema", ts.schemaName)
for _, filter := range filters {
if filter != nil {
filter(queryArgs)
}
}
rb := common.ResourceBuilder{ProjectName: ts.projectName}
resource := rb.Tables()
client := ts.odpsIns.restClient
type ResModel struct {
XMLName xml.Name `xml:"Tables"`
Tables []tableModel `xml:"Table"`
Marker string
MaxItems int
}
var resModel ResModel
for {
err := client.GetWithModel(resource, queryArgs, nil, &resModel)
if err != nil {
f(nil, err)
break
}
for _, tableModel := range resModel.Tables {
table := NewTable(ts.odpsIns, ts.projectName, ts.schemaName, tableModel.Name)
table.model.Owner = tableModel.Owner
f(table, nil)
}
if resModel.Marker != "" {
queryArgs.Set("marker", resModel.Marker)
resModel = ResModel{}
} else {
break
}
}
}
// BatchLoadTables can get at most 100 tables, and the information of table is according to the permission
func (ts *Tables) BatchLoadTables(tableNames []string) ([]*Table, error) {
type PostBodyModel struct {
XMLName xml.Name `xml:"Tables"`
Tables []struct {
Project string
Name string
Schema string
} `xml:"Table"`
}
var postBodyModel PostBodyModel
for _, tableName := range tableNames {
postBodyModel.Tables = append(postBodyModel.Tables, struct {
Project string
Name string
Schema string
}{Project: ts.projectName, Name: tableName, Schema: ts.schemaName})
}
type ResModel struct {
XMLName xml.Name `xml:"Tables"`
Table []tableModel
}
var resModel ResModel
queryArgs := make(url.Values, 4)
queryArgs.Set("query", "")
if ts.schemaName != "" {
queryArgs.Set("curr_schema", ts.schemaName)
}
rb := common.ResourceBuilder{ProjectName: ts.projectName}
resource := rb.Tables()
client := ts.odpsIns.restClient
err := client.DoXmlWithModel(common.HttpMethod.PostMethod, resource, queryArgs, &postBodyModel, &resModel)
if err != nil {
return nil, errors.WithStack(err)
}
ret := make([]*Table, len(resModel.Table))
for i := range resModel.Table {
tableModel := &resModel.Table[i]
table, err := newTableWithModel(ts.odpsIns, tableModel)
if err != nil {
return nil, errors.WithStack(err)
}
ret[i] = table
}
return ret, nil
}
func (ts *Tables) Get(tableName string) *Table {
table := NewTable(ts.odpsIns, ts.projectName, ts.schemaName, tableName)
return table
}
// Create table with schema, the schema can be build with tableschema.SchemaBuilder
// parameter hints can affect the `Set` sql execution, like odps.mapred.map.split.size
// you can get introduce about alias from the reference of alias command
func (ts *Tables) Create(
schema tableschema.TableSchema,
createIfNotExists bool,
hints, alias map[string]string,
) error {
sql, err := schema.ToSQLString(ts.projectName, ts.schemaName, createIfNotExists)
if err != nil {
return errors.WithStack(err)
}
if hints == nil {
hints = make(map[string]string)
}
if ts.schemaName == "" {
hints["odps.namespace.schema"] = "false"
} else {
hints["odps.namespace.schema"] = "true"
}
task := NewSqlTask("SQLCreateTableTask", sql, hints)
// TODO rm aliases
if alias != nil {
aliasJson, _ := json.Marshal(alias)
task.AddProperty("aliases", string(aliasJson))
}
instances := NewInstances(ts.odpsIns, ts.projectName)
ins, err := instances.CreateTask(ts.projectName, &task)
if err != nil {
return errors.WithStack(err)
}
return errors.WithStack(ins.WaitForSuccess())
}
// CreateExternal create external table, the schema can be build with tableschema.SchemaBuilder
func (ts *Tables) CreateExternal(
schema tableschema.TableSchema,
createIfNotExists bool,
serdeProperties map[string]string,
jars []string,
hints, alias map[string]string,
) error {
sql, err := schema.ToExternalSQLString(ts.projectName, ts.schemaName, createIfNotExists, serdeProperties, jars)
if err != nil {
return errors.WithStack(err)
}
if hints == nil {
hints = make(map[string]string)
}
if ts.schemaName == "" {
hints["odps.namespace.schema"] = "false"
} else {
hints["odps.namespace.schema"] = "true"
}
task := NewSqlTask("SQLCreateExternalTableTask", sql, hints)
if alias != nil {
aliasJson, _ := json.Marshal(hints)
task.AddProperty("aliases", string(aliasJson))
}
instances := NewInstances(ts.odpsIns, ts.projectName)
i, err := instances.CreateTask(ts.projectName, &task)
if err != nil {
return errors.WithStack(err)
}
return errors.WithStack(i.WaitForSuccess())
}
func (ts *Tables) CreateView(schema tableschema.TableSchema,
orReplace,
createIfNotExists,
buildDeferred bool,
) error {
sql, err := schema.ToViewSQLString(ts.projectName, ts.schemaName, orReplace, createIfNotExists, buildDeferred)
if err != nil {
return errors.WithStack(err)
}
task := NewSqlTask("SQLCreateViewTask", sql, nil)
instances := NewInstances(ts.odpsIns, ts.projectName)
i, err := instances.CreateTask(ts.projectName, &task)
if err != nil {
return errors.WithStack(err)
}
return errors.WithStack(i.WaitForSuccess())
}
func (ts *Tables) CreateWithDataHub(
schema tableschema.TableSchema,
createIfNotExists bool,
shardNum,
hubLifecycle int,
) error {
sql, err := schema.ToBaseSQLString(ts.projectName, ts.schemaName, createIfNotExists, false)
if err != nil {
return errors.WithStack(err)
}
var sb strings.Builder
sb.WriteString(sql)
if schema.Lifecycle > 0 {
sb.WriteString(fmt.Sprintf("\nlifecycle %d", schema.Lifecycle))
}
sb.WriteString(fmt.Sprintf("\ninto %d shards", shardNum))
sb.WriteString(fmt.Sprintf("\nhubLifecycle %d", hubLifecycle))
sb.WriteRune(';')
hints := make(map[string]string)
if ts.schemaName == "" {
hints["odps.namespace.schema"] = "false"
} else {
hints["odps.namespace.schema"] = "true"
}
task := NewSqlTask("SQLCreateTableTaskWithDataHub", sb.String(), hints)
instances := NewInstances(ts.odpsIns, ts.projectName)
i, err := instances.CreateTask(ts.projectName, &task)
if err != nil {
return errors.WithStack(err)
}
return errors.WithStack(i.WaitForSuccess())
}
// Delete delete table
func (ts *Tables) Delete(tableName string, ifExists bool) error {
var sqlBuilder strings.Builder
hints := make(map[string]string)
hints["odps.namespace.schema"] = "false"
sqlBuilder.WriteString("drop table")
if ifExists {
sqlBuilder.WriteString(" if exists")
}
sqlBuilder.WriteRune(' ')
sqlBuilder.WriteString(ts.projectName)
sqlBuilder.WriteRune('.')
if ts.schemaName != "" {
hints["odps.namespace.schema"] = "true"
sqlBuilder.WriteString("`" + ts.schemaName + "`")
sqlBuilder.WriteRune('.')
}
sqlBuilder.WriteString("`" + tableName + "`")
sqlBuilder.WriteString(";")
sqlTask := NewSqlTask("SQLDropTableTask", sqlBuilder.String(), hints)
instances := NewInstances(ts.odpsIns, ts.projectName)
i, err := instances.CreateTask(ts.projectName, &sqlTask)
if err != nil {
return errors.WithStack(err)
}
return errors.WithStack(i.WaitForSuccess())
}
type TFilterFunc func(url.Values)
var TableFilter = struct {
// Weather get extended information or not
Extended func() TFilterFunc
// Filter out tables with name prefix
NamePrefix func(string) TFilterFunc
// Filter out tables with owner name
Owner func(string) TFilterFunc
// Filter out tables with table type
Type func(TableType) TFilterFunc
}{
Extended: func() TFilterFunc {
return func(values url.Values) {
values.Set("extended", "")
}
},
NamePrefix: func(name string) TFilterFunc {
return func(values url.Values) {
values.Set("name", name)
}
},
Owner: func(owner string) TFilterFunc {
return func(values url.Values) {
values.Set("owner", owner)
}
},
Type: func(tableType TableType) TFilterFunc {
return func(values url.Values) {
values.Set("type", tableType.String())
}
},
}