odps/tableschema/table_schema.go (600 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package tableschema import ( "bytes" "encoding/json" "fmt" "strconv" "strings" "text/template" "github.com/pkg/errors" "github.com/aliyun/aliyun-odps-go-sdk/odps/data" "github.com/aliyun/aliyun-odps-go-sdk/arrow" "github.com/aliyun/aliyun-odps-go-sdk/odps/common" ) type TableSchema struct { TableName string Columns []Column Comment string CreateTime common.GMTTime ExtendedLabel []string HubLifecycle int IsExternal bool IsMaterializedView bool IsMaterializedViewRewriteEnabled bool IsMaterializedViewOutdated bool IsVirtualView bool LastDDLTime common.GMTTime LastModifiedTime common.GMTTime LastAccessTime common.GMTTime Lifecycle int Owner string PartitionColumns []Column `json:"PartitionKeys"` RecordNum int ShardExist bool ShardInfo string Size int64 TableLabel string ViewText string ViewExpandedText string // extended schema, got by adding "?extended" to table api FileNum int IsArchived bool PhysicalSize int Reserved string // reserved json string, 字段不固定 PrimaryKeys []string Transactional bool // for external table extended info StorageHandler string Location string resources string SerDeProperties map[string]string `json:"-"` Props string MvProperties map[string]string `json:"-"` // materialized view properties RefreshHistory string // for clustered info TblProperties map[string]string `json:"-"` ClusterInfo ClusterInfo } type reservedJSON struct { ClusterInfo Transactional string `json:"Transactional"` PrimaryKeys []string `json:"PrimaryKey"` } type ClusterType = string // ClusterInfo 聚簇信息 type ClusterInfo struct { ClusterType ClusterType ClusterCols []string SortCols []SortColumn BucketNum int } var CLUSTER_TYPE = struct { Hash ClusterType Range ClusterType }{ Hash: "hash", Range: "range", } type SortOrder string var SORT_ORDER = struct { ASC SortOrder DESC SortOrder }{ ASC: "asc", DESC: "desc", } type SortColumn struct { Name string Order SortOrder } type SchemaBuilder struct { name string comment string columns []Column partitionColumns []Column storageHandler string location string viewText string lifecycle int primaryKeys []string tblProperties map[string]string clusterInfo ClusterInfo isVirtualView bool isMaterializedView bool isMaterializedViewRewriteEnabled bool mvProperties map[string]string } func NewSchemaBuilder() *SchemaBuilder { return &SchemaBuilder{} } func (builder *SchemaBuilder) Name(name string) *SchemaBuilder { builder.name = name return builder } func (builder *SchemaBuilder) Comment(comment string) *SchemaBuilder { builder.comment = comment return builder } func (builder *SchemaBuilder) Column(column Column) *SchemaBuilder { builder.columns = append(builder.columns, column) return builder } func (builder *SchemaBuilder) PartitionColumn(column Column) *SchemaBuilder { builder.partitionColumns = append(builder.partitionColumns, column) return builder } func (builder *SchemaBuilder) Columns(columns ...Column) *SchemaBuilder { builder.columns = append(builder.columns, columns...) return builder } func (builder *SchemaBuilder) PartitionColumns(columns ...Column) *SchemaBuilder { builder.partitionColumns = append(builder.partitionColumns, columns...) return builder } func (builder *SchemaBuilder) StorageHandler(storageHandler string) *SchemaBuilder { builder.storageHandler = storageHandler return builder } func (builder *SchemaBuilder) Location(location string) *SchemaBuilder { builder.location = location return builder } // Lifecycle 表的生命周期,仅支持正整数。单位:天 func (builder *SchemaBuilder) Lifecycle(lifecycle int) *SchemaBuilder { builder.lifecycle = lifecycle return builder } // TblProperties 表的属性,key-value 形式 func (builder *SchemaBuilder) TblProperties(properties map[string]string) *SchemaBuilder { builder.tblProperties = properties return builder } func (builder *SchemaBuilder) ClusterType(clusterType ClusterType) *SchemaBuilder { builder.clusterInfo.ClusterType = clusterType return builder } func (builder *SchemaBuilder) ClusterColumns(clusterCols []string) *SchemaBuilder { builder.clusterInfo.ClusterCols = clusterCols return builder } func (builder *SchemaBuilder) ClusterSortColumns(clusterSortCols []SortColumn) *SchemaBuilder { builder.clusterInfo.SortCols = clusterSortCols return builder } func (builder *SchemaBuilder) ClusterBucketNum(bucketNum int) *SchemaBuilder { builder.clusterInfo.BucketNum = bucketNum return builder } func (builder *SchemaBuilder) IsMaterializedView(isMaterializedView bool) *SchemaBuilder { builder.isMaterializedView = isMaterializedView return builder } func (builder *SchemaBuilder) IsMaterializedViewRewriteEnabled(isMaterializedViewRewriteEnabled bool) *SchemaBuilder { builder.isMaterializedViewRewriteEnabled = isMaterializedViewRewriteEnabled return builder } func (builder *SchemaBuilder) IsVirtualView(isVirtualView bool) *SchemaBuilder { builder.isVirtualView = isVirtualView return builder } func (builder *SchemaBuilder) ViewText(viewText string) *SchemaBuilder { builder.viewText = viewText return builder } func (builder *SchemaBuilder) MvProperty(key, value string) *SchemaBuilder { if builder.mvProperties == nil { builder.mvProperties = make(map[string]string) } builder.mvProperties[key] = value return builder } func (builder *SchemaBuilder) MvProperties(properties map[string]string) *SchemaBuilder { builder.mvProperties = properties return builder } // PrimaryKeys specify primary keys of the table func (builder *SchemaBuilder) PrimaryKeys(primaryKeys []string) *SchemaBuilder { builder.primaryKeys = primaryKeys return builder } func (builder *SchemaBuilder) Build() TableSchema { return TableSchema{ TableName: builder.name, Columns: builder.columns, PartitionColumns: builder.partitionColumns, Comment: builder.comment, Lifecycle: builder.lifecycle, StorageHandler: builder.storageHandler, Location: builder.location, ClusterInfo: builder.clusterInfo, TblProperties: builder.tblProperties, PrimaryKeys: builder.primaryKeys, IsVirtualView: builder.isVirtualView, IsMaterializedView: builder.isMaterializedView, IsMaterializedViewRewriteEnabled: builder.isMaterializedViewRewriteEnabled, ViewText: builder.viewText, MvProperties: builder.mvProperties, } } func (schema *TableSchema) UnmarshalJSON(data []byte) error { type Alias TableSchema tempSchema := &struct { SerDePropertiesString *string `json:"serDeProperties,omitempty"` MvPropertiesString *string `json:"mvProperties,omitempty"` *Alias }{ Alias: (*Alias)(schema), } if err := json.Unmarshal(data, &tempSchema); err != nil { return err } if tempSchema.SerDePropertiesString != nil { err := json.Unmarshal([]byte(*tempSchema.SerDePropertiesString), &schema.SerDeProperties) if err != nil { return err } } if tempSchema.MvPropertiesString != nil { err := json.Unmarshal([]byte(*tempSchema.MvPropertiesString), &schema.MvProperties) if err != nil { return err } } if tempSchema.Reserved != "" { var reservedData reservedJSON err := json.Unmarshal([]byte(tempSchema.Reserved), &reservedData) if err != nil { return err } schema.ClusterInfo = reservedData.ClusterInfo schema.PrimaryKeys = reservedData.PrimaryKeys schema.Transactional = common.StringToBool(reservedData.Transactional) } return nil } func (schema *TableSchema) ToBaseSQLString(projectName string, schemaName string, createIfNotExists, isExternal bool) (string, error) { if schema.TableName == "" { return "", errors.New("table name is not set") } if len(schema.Columns) == 0 { return "", errors.New("table columns is not set") } fns := template.FuncMap{ "notLast": func(i, length int) bool { return i < length-1 }, "quoteString": common.QuoteString, } tplStr := "{{$columnNum := len .Schema.Columns}}" + "{{$partitionNum := len .Schema.PartitionColumns}}" + "{{$primaryKeysNum:= len .Schema.PrimaryKeys}}" + "create {{if .IsExternal -}} external {{ end -}} table {{ if .CreateIfNotExists }}if not exists{{ end }} " + "{{.ProjectName}}.{{if ne .SchemaName \"\"}}`{{.SchemaName}}`.{{end}}`{{.Schema.TableName}}` (\n" + "{{ range $i, $column := .Schema.Columns }}" + " `{{.Name}}` {{.Type.Name | print}} {{ if .NotNull }}not null{{ end }} {{ if ne .Comment \"\" }}comment {{quoteString .Comment}}{{ end }}{{ if notLast $i $columnNum }},{{ end }}\n" + "{{ end }}" + "{{ if gt $primaryKeysNum 0 }}" + ",primary key({{ range $i, $pk := .Schema.PrimaryKeys }} `{{.}}`{{ if notLast $i $primaryKeysNum }},{{ end }}{{ end }})" + "{{ end }}" + ")" + "{{ if ne .Schema.Comment \"\" }}" + "\ncomment {{quoteString .Schema.Comment}}" + "{{ end }}" + "{{ if gt $partitionNum 0 }}" + "{{ if (index .Schema.PartitionColumns 0).GenerateExpression }}" + "\nauto partitioned by (" + "{{ (index .Schema.PartitionColumns 0).GenerateExpression.String }} AS {{ (index .Schema.PartitionColumns 0).Name }}" + "{{ else }}" + "\npartitioned by (" + "{{ range $i, $partition := .Schema.PartitionColumns }}" + "`{{.Name}}` {{.Type | print}} {{- if ne .Comment \"\" }} comment {{quoteString .Comment}} {{- end -}} {{- if notLast $i $partitionNum }}, {{ end }}" + "{{ end -}}" + "{{ end -}}" + ")" + "{{ end }}" tpl, err := template.New("DDL_CREATE_TABLE").Funcs(fns).Parse(tplStr) if err != nil { panic(err) } type Data struct { ProjectName string SchemaName string Schema *TableSchema IsExternal bool CreateIfNotExists bool } data := Data{projectName, schemaName, schema, isExternal, createIfNotExists} var out bytes.Buffer err = tpl.Execute(&out, data) if err != nil { panic(err) } return out.String(), nil } func (schema *TableSchema) ToViewSQLString(projectName string, schemaName string, orReplace, createIfNotExists, buildDeferred bool) (string, error) { if schema.TableName == "" { return "", errors.New("view name is not set") } if (schema.IsVirtualView || schema.IsMaterializedView) && schema.ViewText == "" { return "", errors.New("view text is not set") } if schema.IsVirtualView && len(schema.PartitionColumns) > 0 { return "", errors.New("virtual view can not have partition columns") } if schema.IsVirtualView && schema.IsMaterializedView { return "", errors.New("virtual view and materialized view can not be both set") } if !schema.IsVirtualView && !schema.IsMaterializedView { return "", errors.New("either virtual view or materialized should be set") } fns := template.FuncMap{ "notLast": func(i, length int) bool { return i < length-1 }, "quoteString": common.QuoteString, } tplStr := "{{$columnNum := len .Schema.Columns}}" + "{{$partitionNum := len .Schema.PartitionColumns}}" + "{{$mvPropertiesNum:= len .Schema.MvProperties}}" + "{{$clusterColsNum:= len .Schema.ClusterInfo.ClusterCols}}" + "{{$clusterSortColsNum:= len .Schema.ClusterInfo.SortCols}}" + "create " + "{{if and .Schema.IsVirtualView .OrReplace -}}" + "or replace " + "{{end}}" + "{{if .Schema.IsMaterializedView -}}" + "materialized " + "{{ end -}}" + "view " + "{{ if .CreateIfNotExists }}" + "if not exists" + "{{ end }} " + "{{if ne .ProjectName \"\"}}" + "{{.ProjectName}}." + "{{end}}" + "{{if ne .SchemaName \"\"}}" + "`{{.SchemaName}}`." + "{{end}}" + "`{{.Schema.TableName}}`" + "{{ if .Schema.IsMaterializedView }}" + "{{ if gt .Schema.Lifecycle 0 }}" + "\nlifecycle {{.Schema.Lifecycle}}" + " {{end -}}" + "{{ if .BuildDeferred }}" + "\nbuild deferred" + " {{end -}}" + "{{end -}}" + "{{ if gt $columnNum 0 }}" + "(\n {{ range $i, $column := .Schema.Columns }}" + " `{{.Name}}` {{ if ne .Comment \"\" }}comment {{quoteString .Comment}}{{ end }}{{ if notLast $i $columnNum }},{{ end }}\n" + "{{ end }}" + ")" + "{{ end }}" + "{{ if .Schema.IsMaterializedView }}" + "{{ if not .Schema.IsMaterializedViewRewriteEnabled}}" + " disable rewrite " + "{{end -}}" + "{{end -}}" + "{{ if ne .Schema.Comment \"\" }}" + "\ncomment {{quoteString .Schema.Comment}}" + "{{ end }}" + "{{ if .Schema.IsMaterializedView }}" + "{{ if gt $partitionNum 0 }}" + "\npartitioned by (" + "{{ range $i, $partition := .Schema.PartitionColumns }}" + "\n`{{.Name}}`" + " {{- if notLast $i $partitionNum }}" + "," + " {{ end }}" + "{{ end -}}" + ")" + "{{ end }}" + "{{ if gt $clusterColsNum 0 }}" + "{{ if eq .Schema.ClusterInfo.ClusterType \"hash\" }}" + " \nclustered by (" + "{{ range $i, $clusterCol := .Schema.ClusterInfo.ClusterCols }}" + "`{{.}}`" + "{{ if notLast $i $clusterColsNum }},{{ end }}" + "{{end -}}" + ")" + "{{else if eq .Schema.ClusterInfo.ClusterType \"range\"}}" + "\nrange clustered by (" + "{{ range $i, $clusterCol := .Schema.ClusterInfo.ClusterCols }}" + "`{{.}}`" + "{{ if notLast $i $clusterColsNum }},{{ end }}" + "{{end -}}" + ")" + "{{end -}}" + "{{ if gt $clusterSortColsNum 0 }}" + "\nsorted by (" + "{{ range $i, $clusterSortCol := .Schema.ClusterInfo.SortCols }}" + "`{{.Name}}` {{.Order}}" + "{{ if notLast $i $clusterSortColsNum }},{{ end }}" + "{{end -}}" + ")" + "{{end -}}" + "{{ if .Schema.ClusterInfo.BucketNum }}" + " into {{.Schema.ClusterInfo.BucketNum}} buckets" + "{{end -}}" + "{{end -}}" + "{{ if .MvProperties }} " + "\nTBLPROPERTIES (" + "{{ range $i, $kv := .MvProperties }}" + "\"{{$kv.Key}}\"=\"{{$kv.Value}}\"" + "{{ if notLast $i $mvPropertiesNum }},{{ end }}" + "{{end -}}" + ")" + "{{end}}" + "{{end}}" + "\nas {{.Schema.ViewText}};" tpl, err := template.New("DDL_CREATE_VIEW").Funcs(fns).Parse(tplStr) if err != nil { panic(err) } type kvPair struct { Key string Value string } type Data struct { ProjectName string SchemaName string Schema *TableSchema MvProperties []kvPair OrReplace bool CreateIfNotExists bool BuildDeferred bool } data := Data{ProjectName: projectName, SchemaName: schemaName, Schema: schema, OrReplace: orReplace, CreateIfNotExists: createIfNotExists, BuildDeferred: buildDeferred} for k, v := range schema.MvProperties { data.MvProperties = append(data.MvProperties, kvPair{k, v}) } var out bytes.Buffer err = tpl.Execute(&out, data) if err != nil { panic(err) } return out.String(), nil } func (schema *TableSchema) ToSQLString(projectName string, schemaName string, createIfNotExists bool) (string, error) { baseSQL, err := schema.ToBaseSQLString(projectName, schemaName, createIfNotExists, false) if err != nil { return "", errors.WithStack(err) } // 添加hash clustering或range clustering clusterInfo := schema.ClusterInfo if len(clusterInfo.ClusterCols) > 0 { quotedColumns := make([]string, len(clusterInfo.ClusterCols)) for i, columnName := range clusterInfo.ClusterCols { quotedColumns[i] = common.QuoteRef(columnName) } if clusterInfo.ClusterType == CLUSTER_TYPE.Hash { baseSQL += "\nclustered by (" + strings.Join(quotedColumns, ", ") + ")" } if clusterInfo.ClusterType == CLUSTER_TYPE.Range { baseSQL += "\nrange clustered by (" + strings.Join(quotedColumns, ", ") + ")" } sortColsNum := len(clusterInfo.SortCols) if sortColsNum > 0 { baseSQL += "\nsorted by (" for i, sc := range clusterInfo.SortCols { baseSQL += common.QuoteRef(sc.Name) + " " + string(sc.Order) if i < sortColsNum-1 { baseSQL += ", " } } baseSQL += ")" } if clusterInfo.BucketNum > 0 { baseSQL += "\nINTO " + strconv.Itoa(clusterInfo.BucketNum) + " BUCKETS" } } if len(schema.TblProperties) > 0 { baseSQL += "\nTBLPROPERTIES (" i := 0 sortColsNum := len(schema.TblProperties) for k, v := range schema.TblProperties { baseSQL += fmt.Sprintf("%s=%s", common.QuoteString(k), common.QuoteString(v)) i++ if i < sortColsNum { baseSQL += ", " } } baseSQL += ")" } if schema.Lifecycle > 0 { baseSQL += fmt.Sprintf("\nlifecycle %d", schema.Lifecycle) } baseSQL += ";" return baseSQL, nil } func (schema *TableSchema) ToExternalSQLString( projectName string, schemaName string, createIfNotExists bool, serdeProperties map[string]string, jars []string, ) (string, error) { if schema.StorageHandler == "" { return "", errors.New("TableSchema.StorageHandler is not set") } if schema.Location == "" { return "", errors.New("TableSchema.Location is not set") } baseSQL, err := schema.ToBaseSQLString(projectName, schemaName, createIfNotExists, true) if err != nil { return "", errors.WithStack(err) } var builder strings.Builder builder.WriteString(baseSQL) // stored by, 用于指定自定义格式StorageHandler的类名或其他外部表文件格式 builder.WriteString(fmt.Sprintf("\nstored by '%s'\n", schema.StorageHandler)) // serde properties, 序列化属性参数 if len(serdeProperties) > 0 { builder.WriteString("with serdeproperties(") i, n := 0, len(serdeProperties) for key, value := range serdeProperties { builder.WriteString(fmt.Sprintf("%s=%s", common.QuoteString(key), common.QuoteString(value))) i += 1 if i < n { builder.WriteString(", ") } } builder.WriteString(")\n") } // location, 外部表存放地址 builder.WriteString(fmt.Sprintf("location '%s'\n", schema.Location)) // 自定义格式时类定义所在的jar if len(jars) > 0 { builder.WriteString("using '") n := len(jars) for i, jar := range jars { builder.WriteString(jar) if i < n-1 { builder.WriteString(", ") } } builder.WriteString("'\n") } if len(schema.TblProperties) > 0 { builder.WriteString("TBLPROPERTIES (") i := 0 sortColsNum := len(schema.TblProperties) for k, v := range schema.TblProperties { builder.WriteString(fmt.Sprintf("%s=%s", common.QuoteString(k), common.QuoteString(v))) i++ if i < sortColsNum { builder.WriteString(", ") } } builder.WriteString(")\n") } if schema.Lifecycle > 0 { builder.WriteString(fmt.Sprintf("lifecycle %d", schema.Lifecycle)) } builder.WriteRune(';') return builder.String(), nil } func (schema *TableSchema) ToArrowSchema() *arrow.Schema { fields := make([]arrow.Field, len(schema.Columns)) for i, column := range schema.Columns { arrowType, _ := TypeToArrowType(column.Type) fields[i] = arrow.Field{ Name: column.Name, Type: arrowType, Nullable: !column.NotNull, } } return arrow.NewSchema(fields, nil) } func (schema *TableSchema) FieldByName(name string) (Column, bool) { for _, c := range schema.Columns { if c.Name == name { return c, true } } return Column{}, false } // GeneratePartitionSpec Used for Auto-Partition tables to automatically generate partition columns based on Record func (schema *TableSchema) GeneratePartitionSpec(record *data.Record) (string, error) { var builder strings.Builder for index, partitionColumn := range schema.PartitionColumns { if partitionColumn.GenerateExpression != nil { if index > 0 { builder.WriteString("/") } generate, err := partitionColumn.GenerateExpression.generate(record, schema) if err != nil { return "", err } builder.WriteString(partitionColumn.Name + "=" + generate) } } return builder.String(), nil }