odps/resources.go (306 lines of code) (raw):
package odps
import (
"bytes"
"crypto/md5"
"encoding/hex"
"encoding/xml"
"fmt"
"io"
"io/ioutil"
"net/url"
"strconv"
"strings"
"time"
"github.com/pkg/errors"
"github.com/aliyun/aliyun-odps-go-sdk/odps/common"
)
// Resources is a collection of resources
type Resources struct {
ProjectName string
schemaName string
OdpsIns *Odps
ChunkSize int
}
// NewResources create a new Resources
func NewResources(OdpsIns *Odps) *Resources {
return &Resources{
ProjectName: OdpsIns.DefaultProjectName(),
schemaName: OdpsIns.CurrentSchemaName(),
OdpsIns: OdpsIns,
ChunkSize: 64 << 20,
}
}
// List list resources with filters
func (r *Resources) List(f func(*Resource, error), filters ...RFileFunc) {
queryArgs := make(url.Values, 4)
queryArgs.Set("expectmarker", "true")
queryArgs.Set("curr_schema", r.schemaName)
for _, filter := range filters {
if filter != nil {
filter(queryArgs)
}
}
rb := common.ResourceBuilder{ProjectName: r.ProjectName}
resource := rb.Resources()
client := r.OdpsIns.restClient
type ResModel struct {
XMLName xml.Name `xml:"Resources"`
Resources []resourceModel `xml:"Resource"`
Marker string
MaxItems int
}
var resModel ResModel
for {
err := client.GetWithModel(resource, queryArgs, nil, &resModel)
if err != nil {
f(nil, err)
break
}
for _, resourceModel := range resModel.Resources {
resource := NewResource(resourceModel, r.ProjectName, r.OdpsIns)
resource.beLoaded = true
f(resource, nil)
}
if resModel.Marker != "" {
queryArgs.Set("marker", resModel.Marker)
resModel = ResModel{}
} else {
break
}
}
}
// Get get a resource
func (r *Resources) Get(resourceName string) *Resource {
resource := NewResource(resourceModel{Name: resourceName}, r.ProjectName, r.OdpsIns)
return resource
}
// CreateFileResource create a file resource
func (r *Resources) CreateFileResource(projectName, schemaName string, lfr ReaderResource, overwrite bool) error {
if projectName == "" {
projectName = r.ProjectName
}
if schemaName == "" {
schemaName = r.schemaName
}
if lfr.Name() == "" {
return errors.New("resource name cannot be empty")
}
if lfr.Reader() == nil {
return errors.New("reader cannot be nil")
}
tmpContent := make([]byte, r.ChunkSize)
totalBytes, cnt := int64(0), 0
var tmpFiles []string
hash := md5.New()
for {
n, err := lfr.Reader().Read(tmpContent)
if err != nil {
if err == io.EOF {
break
}
return err
}
hash.Write(tmpContent[:n])
tmpReader := bytes.NewBuffer(tmpContent[:n])
tmpName := ""
if schemaName != "" {
tmpName = fmt.Sprintf("%s.%s.part.tmp.%06d", schemaName, lfr.Name(), cnt)
} else {
tmpName = fmt.Sprintf("%s.part.tmp.%06d", lfr.Name(), cnt)
}
tmp := NewFileResource(tmpName)
tmp.SetReader(tmpReader)
tmp.SetIsTempResource(true)
tmpFiles = append(tmpFiles, tmpName)
err = r.createTempPartFile(projectName, schemaName, *tmp, tmpReader, int64(n))
if err != nil {
return errors.WithStack(err)
}
cnt++
totalBytes += int64(n)
}
md5sum := hex.EncodeToString(hash.Sum(nil))
commitContent := fmt.Sprintf("%s|%s", md5sum, strings.Join(tmpFiles, ","))
commitReader := bytes.NewBuffer([]byte(commitContent))
tmp := FileResource{Resource: Resource{Model: resourceModel{ResourceType: ResourceTypeFile}}}
tmp.SetName(lfr.Name())
tmp.SetComment(lfr.Comment())
tmp.SetIsTempResource(lfr.IsTempResource())
tmp.SetResourceType(lfr.ResourceType())
return errors.WithStack(r.mergeTempPartFile(projectName, schemaName, tmp, commitReader, overwrite, totalBytes))
}
// UpdateFileResource update a file resource
func (r *Resources) UpdateFileResource(projectName, schemaName string, lfr ReaderResource) error {
return r.CreateFileResource(projectName, schemaName, lfr, true)
}
func (r *Resources) createTempPartFile(projectName, schemaName string, fr FileResource, reader *bytes.Buffer, length int64) error {
if fr.Name() == "" {
return errors.New("Temp part resource name cannot be empty")
}
if !fr.IsTempResource() {
return errors.New("Temp part resource must be temp resource")
}
if fr.ResourceType() == ResourceTypeVolumeFile || fr.ResourceType() == ResourceTypeArchive || fr.ResourceType() == ResourceTypeTable || fr.ResourceType() == ResourceTypeUnknown {
return errors.New("Temp part resource type must be file")
}
rb := common.ResourceBuilder{ProjectName: projectName}
resource := rb.Resources()
headers := make(map[string]string)
headers[common.HttpHeaderContentType] = "application/octet-stream"
headers[common.HttpHeaderContentDisposition] = "attachment;filename=" + fr.Name()
headers[common.HttpHeaderOdpsResourceType] = ResourceTypeToStr(fr.ResourceType())
headers[common.HttpHeaderOdpsResourceName] = fr.Name()
if fr.Comment() != "" {
headers[common.HttpHeaderOdpsComment] = fr.Comment()
}
headers[common.HttpHeaderOdpsResourceIsTemp] = strconv.FormatBool(fr.IsTempResource())
headers[common.HttpHeaderContentLength] = strconv.FormatInt(length, 10)
tmpReader := bytes.NewReader(reader.Bytes())
if headers[common.HttpHeaderContentMD5] == "" {
hash := md5.New()
_, _ = io.Copy(hash, reader)
headers[common.HttpHeaderContentMD5] = hex.EncodeToString(hash.Sum(nil))
}
queryArgs := make(url.Values, 4)
if schemaName != "" {
queryArgs.Set("curr_schema", schemaName)
}
queryArgs.Set("rIsPart", "true")
client := r.OdpsIns.restClient
req, err := client.NewRequestWithParamsAndHeaders(common.HttpMethod.PostMethod, resource, tmpReader, queryArgs, headers)
if err != nil {
return errors.WithStack(err)
}
client.HttpTimeout = 60 * time.Second
resp, err := client.Do(req)
if err != nil {
return errors.WithStack(err)
}
if resp.StatusCode != 201 {
// Use ioutil.ReadAll instead of io.ReadAll for compatibility with Go 1.15.
data, _ := ioutil.ReadAll(resp.Body)
return errors.WithStack(errors.New(string(data)))
}
return nil
}
func (r *Resources) mergeTempPartFile(projectName, schemaName string, fr FileResource, reader *bytes.Buffer, overwrite bool, totalBytes int64) error {
if fr.Name() == "" {
return errors.New("File resource name cannot be empty")
}
rb := common.NewResourceBuilder(projectName)
var resource, method string
if overwrite {
resource = rb.Resource(fr.Name())
method = common.HttpMethod.PutMethod
} else {
resource = rb.Resources()
method = common.HttpMethod.PostMethod
}
headers := make(map[string]string)
headers[common.HttpHeaderContentType] = "application/octet-stream"
headers[common.HttpHeaderContentDisposition] = "attachment;filename=" + fr.Name()
headers[common.HttpHeaderOdpsResourceType] = ResourceTypeToStr(fr.ResourceType())
headers[common.HttpHeaderOdpsResourceName] = fr.Name()
if fr.Comment() != "" {
headers[common.HttpHeaderOdpsComment] = fr.Comment()
}
if fr.IsTempResource() {
headers[common.HttpHeaderOdpsResourceIsTemp] = strconv.FormatBool(fr.IsTempResource())
}
headers[common.HttpHeaderOdpsResourceMergeTotalBytes] = strconv.FormatInt(totalBytes, 10)
headers[common.HttpHeaderContentLength] = strconv.FormatInt(totalBytes, 10)
tmpReader := bytes.NewReader(reader.Bytes())
if headers[common.HttpHeaderContentMD5] == "" {
hash := md5.New()
_, _ = io.Copy(hash, reader)
headers[common.HttpHeaderContentMD5] = hex.EncodeToString(hash.Sum(nil))
}
queryArgs := make(url.Values, 4)
if schemaName != "" {
queryArgs.Set("curr_schema", schemaName)
}
queryArgs.Set("rOpMerge", "true")
client := r.OdpsIns.restClient
req, err := client.NewRequestWithParamsAndHeaders(method, resource, tmpReader, queryArgs, headers)
if err != nil {
return errors.WithStack(err)
}
resp, err := client.Do(req)
if err != nil {
return errors.WithStack(err)
}
if resp.StatusCode != 201 {
// Use ioutil.ReadAll instead of io.ReadAll for compatibility with Go 1.15.
data, _ := ioutil.ReadAll(resp.Body)
return errors.WithStack(errors.New(string(data)))
}
return nil
}
// Delete delete a resource
func (r *Resources) Delete(resourceName string) error {
rb := common.NewResourceBuilder(r.ProjectName)
resource := rb.Resource(resourceName)
queryArgs := make(url.Values, 4)
if r.schemaName != "" {
queryArgs.Set("curr_schema", r.schemaName)
}
client := r.OdpsIns.restClient
req, err := client.NewRequestWithUrlQuery(common.HttpMethod.DeleteMethod, resource, nil, queryArgs)
if err != nil {
return errors.WithStack(err)
}
client.HttpTimeout = 60 * time.Second
_, err = client.Do(req)
if err != nil {
return errors.WithStack(err)
}
return nil
}
// CreateTableResource create a table resource
func (r *Resources) CreateTableResource(projectName, schemaName string, tr TableResource, overwrite bool) error {
if projectName == "" {
projectName = r.ProjectName
}
if schemaName == "" {
schemaName = r.schemaName
}
if tr.Name() == "" {
return errors.New("resource name cannot be empty")
}
rb := common.NewResourceBuilder(projectName)
var resource, method string
if overwrite {
resource = rb.Resource(tr.Name())
method = common.HttpMethod.PutMethod
} else {
resource = rb.Resources()
method = common.HttpMethod.PostMethod
}
headers := make(map[string]string)
headers[common.HttpHeaderContentType] = "text/plain"
headers[common.HttpHeaderOdpsResourceType] = ResourceTypeToStr(tr.ResourceType())
headers[common.HttpHeaderOdpsResourceName] = tr.Name()
headers[common.HttpHeaderOdpsCopyTableSource] = tr.getSourceTableName()
if tr.Comment() != "" {
headers[common.HttpHeaderOdpsComment] = tr.Comment()
}
queryArgs := make(url.Values, 4)
if schemaName != "" {
queryArgs.Set("curr_schema", schemaName)
}
client := r.OdpsIns.restClient
req, err := client.NewRequestWithParamsAndHeaders(method, resource, nil, queryArgs, headers)
if err != nil {
return errors.WithStack(err)
}
client.HttpTimeout = 60 * time.Second
resp, err := client.Do(req)
if err != nil {
return errors.WithStack(err)
}
if resp.StatusCode != 201 {
// Use ioutil.ReadAll instead of io.ReadAll for compatibility with Go 1.15.
data, _ := ioutil.ReadAll(resp.Body)
return errors.WithStack(errors.New(string(data)))
}
return nil
}
// RFileFunc is a function to filter resources
type RFileFunc func(url.Values)
// ResourceFilter is a filter for resources
var ResourceFilter = struct {
// Filter out resources with name prefix
NamePrefix func(string) TFilterFunc
}{
NamePrefix: func(name string) TFilterFunc {
return func(values url.Values) {
values.Set("name", name)
}
},
}