odps/partition.go (181 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package odps
import (
"encoding/json"
"encoding/xml"
"fmt"
"net/url"
"strings"
"time"
"github.com/pkg/errors"
"github.com/aliyun/aliyun-odps-go-sdk/odps/common"
)
// Partition ODPS分区表中一个特定的分区
type Partition struct {
odpsIns *Odps
projectName string
schemaName string
tableName string
model partitionModel
extendedModel partitionExtendedModel
}
type PartitionColumn struct {
Name string
Value string
}
type partitionModel struct {
Value []PartitionColumn `json:"column"`
CreateTime common.GMTTime `json:"createTime"`
LastDDLTime common.GMTTime `json:"lastDDLTime"`
LastAccessTime common.GMTTime `json:"lastAccessTime"`
LastModifiedTime common.GMTTime `json:"lastModifiedTime"`
PartitionRecordNum int `json:"partitionRecordNum"`
PartitionSize int `json:"partitionSize"`
}
type partitionExtendedModel struct {
FileNum int `json:"FileNum"`
IsArchived bool `json:"IsArchived"`
IsExStore bool `json:"IsExstore"`
LifeCycle int `json:"LifeCycle"`
PhysicalSize int `json:"PhysicalSize"`
Reserved string `json:"Reserved"`
}
// NewPartition get partition object by table and partitionSpec, this method does not actually create a Partition in MaxCompute,
// but only obtains the Partition object.
func NewPartition(odpsIns *Odps, projectName, schemaName, tableName string, value string) *Partition {
parts := strings.Split(value, "/")
columns := make([]PartitionColumn, len(parts))
for i, p := range parts {
kv := strings.Split(p, "=")
columns[i] = PartitionColumn{
Name: kv[0],
Value: kv[1],
}
}
pm := partitionModel{
Value: columns,
}
return &Partition{
odpsIns: odpsIns,
projectName: projectName,
schemaName: schemaName,
tableName: tableName,
model: pm,
}
}
// Deprecated: Do not use this function. Use Value instead
// Name return string with format like "a=xx/b=yy"
func (p *Partition) Name() string {
return p.Value()
}
// Value return partition value with format like "a=xx/b=yy"
func (p *Partition) Value() string {
var sb strings.Builder
n := len(p.model.Value)
for i, c := range p.model.Value {
sb.WriteString(fmt.Sprintf("%s=%s", c.Name, c.Value))
i += 1
if i < n {
sb.WriteString("/")
}
}
return sb.String()
}
// Spec return partition value with format like "a='xx',b='yy'"
func (p *Partition) Spec() string {
var sb strings.Builder
n := len(p.model.Value)
for i, c := range p.model.Value {
sb.WriteString(fmt.Sprintf("%s='%s'", c.Name, c.Value))
i += 1
if i < n {
sb.WriteString(",")
}
}
return sb.String()
}
func (p *Partition) Load() error {
var rb common.ResourceBuilder
rb.SetProject(p.projectName)
resource := rb.Table("", p.tableName)
client := p.odpsIns.restClient
queryArgs := make(url.Values, 2)
queryArgs.Set("partition", p.Spec())
if p.schemaName != "" {
queryArgs.Set("curr_schema", p.schemaName)
}
type ResModel struct {
XMLName xml.Name `xml:"Partition"`
Schema string
}
var resModel ResModel
err := client.GetWithModel(resource, queryArgs, nil, &resModel)
if err != nil {
return errors.WithStack(err)
}
var model partitionModel
err = json.Unmarshal([]byte(resModel.Schema), &model)
if err != nil {
return errors.WithStack(err)
}
p.model.CreateTime = model.CreateTime
p.model.LastAccessTime = model.LastAccessTime
p.model.LastModifiedTime = model.LastModifiedTime
p.model.LastDDLTime = model.LastDDLTime
p.model.PartitionRecordNum = model.PartitionRecordNum
p.model.PartitionSize = model.PartitionSize
return nil
}
func (p *Partition) LoadExtended() error {
var rb common.ResourceBuilder
rb.SetProject(p.projectName)
resource := rb.Table("", p.tableName)
client := p.odpsIns.restClient
queryArgs := make(url.Values, 4)
queryArgs.Set("partition", p.Spec())
queryArgs.Set("extended", "")
if p.schemaName != "" {
queryArgs.Set("curr_schema", p.schemaName)
}
type ResModel struct {
XMLName xml.Name `xml:"Partition"`
Schema string
}
var resModel ResModel
err := client.GetWithModel(resource, queryArgs, nil, &resModel)
if err != nil {
return errors.WithStack(err)
}
var model partitionExtendedModel
err = json.Unmarshal([]byte(resModel.Schema), &model)
if err != nil {
return errors.WithStack(err)
}
p.extendedModel = model
return nil
}
func (p *Partition) CreatedTime() time.Time {
return time.Time(p.model.CreateTime)
}
// LastDDLTime 分区Meta修改时间
func (p *Partition) LastDDLTime() time.Time {
return time.Time(p.model.LastDDLTime)
}
func (p *Partition) LastModifiedTime() time.Time {
return time.Time(p.model.LastModifiedTime)
}
// RecordNum 获取分区数据的Record数,若无准确数据,则返回-1
func (p *Partition) RecordNum() int {
return p.model.PartitionRecordNum
}
func (p *Partition) Size() int {
return p.model.PartitionSize
}
func (p *Partition) IsArchivedEx() bool {
return p.extendedModel.IsArchived
}
func (p *Partition) isExStoreEx() bool {
return p.extendedModel.IsExStore
}
func (p *Partition) LifeCycleEx() int {
return p.extendedModel.LifeCycle
}
func (p *Partition) PhysicalSizeEx() int {
return p.extendedModel.PhysicalSize
}
func (p *Partition) FileNumEx() int {
return p.extendedModel.FileNum
}
// ReservedEx 返回扩展信息的保留字段 json 字符串
func (p *Partition) ReservedEx() string {
return p.extendedModel.Reserved
}