odps/tunnel/upload_session.go (351 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package tunnel import ( "encoding/json" "io" "net/http" "net/url" "strconv" "strings" "github.com/pkg/errors" "github.com/aliyun/aliyun-odps-go-sdk/arrow" "github.com/aliyun/aliyun-odps-go-sdk/odps/common" "github.com/aliyun/aliyun-odps-go-sdk/odps/restclient" "github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema" ) type UploadStatus int const ( _ UploadStatus = iota UploadStatusUnknown UploadStatusNormal UploadStatusClosing UploadStatusClosed UploadStatusCanceled UploadStatusExpired UploadStatusCritical UploadStatusCommitting ) // UploadSession works as "insert into", multiply sessions for the same table or partition do not affect each other. // Session id is the unique identifier of a session。 // // UploadSession uses OpenRecordArrowWriter to create a RecordArrowWriter or OpenRecordWriter to create a RecordProtocWriter // for writing data into a table. Each RecordWriter uses a http connection to transfer data with the tunnel server, and // each UploadSession can create multiply RecordWriters, so multiply http connections can be used to upload data // in parallel. // // A block id must be given when creating a RecordWriter, it is the unique identifier of a writer. The block id can be one // number in [0, 20000)。A single RecordWriter can write at most 100G data。If multiply RecordWriters are created with the // same block id, the data will be overwritten, and only the data from the writer who calls Close lastly will be kept. // // The timeout of http connection used by RecordWriter is 120s, the sever will closeRes the connection when no data occurs in the // connection during 120 seconds. // // The Commit method must be called to notify the server that all data has been upload and the data can be written into // the table // // In particular, the partition keys used by a session can not contain "'", for example, "region=hangzhou" is a // positive case, and "region='hangzhou'" is a negative case. But the partition keys like "region='hangzhou'" are more // common, to avoid the users use the error format, the partitionKey of UploadSession is private, it can be set when // creating a session or using SetPartitionKey. type UploadSession struct { Id string ProjectName string // TODO use schema to get the resource url of a table SchemaName string TableName string QuotaName string CreatePartition bool // The partition keys used by a session can not contain "'", for example, "region=hangzhou" is a // positive case, and "region='hangzhou'" is a negative case. But the partition keys like "region='hangzhou'" are more // common, to avoid the users use the error format, the partitionKey of UploadSession is private, it can be set when // creating a session or using SetPartitionKey. partitionKey string Overwrite bool Compressor Compressor RestClient restclient.RestClient fieldMaxSize int shouldTransformDate bool schema tableschema.TableSchema status UploadStatus arrowSchema *arrow.Schema blockIds []int } func (u *UploadSession) BlockIds() []int { return u.blockIds } func (u *UploadSession) PartitionKey() string { return u.partitionKey } func (u *UploadSession) SetPartitionKey(partitionKey string) { u.partitionKey = strings.ReplaceAll(partitionKey, "'", "") u.partitionKey = strings.ReplaceAll(u.partitionKey, "\"", "") } // CreateUploadSession create a new upload session before uploading data。 // The opts can be one or more of: // SessionCfg.WithPartitionKey // SessionCfg.WithSchemaName, it doesn't work now // SessionCfg.WithDefaultDeflateCompressor, using deflate compressor with default level // SessionCfg.WithDeflateCompressor, using deflate compressor with specific level // SessionCfg.WithSnappyFramedCompressor // SessionCfg.Overwrite, overwrite data func CreateUploadSession( projectName, tableName, quotaName string, restClient restclient.RestClient, opts ...Option, ) (*UploadSession, error) { cfg := newSessionConfig(opts...) session := UploadSession{ ProjectName: projectName, SchemaName: cfg.SchemaName, TableName: tableName, QuotaName: quotaName, partitionKey: cfg.PartitionKey, RestClient: restClient, Overwrite: cfg.Overwrite, Compressor: cfg.Compressor, CreatePartition: cfg.CreatePartition, } req, err := session.newInitiationRequest() if err != nil { return nil, errors.WithStack(err) } err = session.loadInformation(req) if err != nil { return nil, errors.WithStack(err) } return &session, nil } // AttachToExistedUploadSession get an existed session by the session id. // The opts can be one or more of: // SessionCfg.WithPartitionKey // SessionCfg.WithSchemaName, it doesn't work now // SessionCfg.WithDefaultDeflateCompressor, using deflate compressor with default level // SessionCfg.WithDeflateCompressor, using deflate compressor with specific level // SessionCfg.WithSnappyFramedCompressor // SessionCfg.Overwrite, overwrite data // SessionCfg.UseArrow, it is the default config func AttachToExistedUploadSession( sessionId, projectName, tableName string, restClient restclient.RestClient, opts ...Option, ) (*UploadSession, error) { cfg := newSessionConfig(opts...) session := UploadSession{ Id: sessionId, ProjectName: projectName, TableName: tableName, partitionKey: cfg.PartitionKey, RestClient: restClient, Overwrite: cfg.Overwrite, Compressor: cfg.Compressor, } err := session.Load() if err != nil { return nil, errors.WithStack(err) } return &session, nil } func (u *UploadSession) Schema() tableschema.TableSchema { return u.schema } func (u *UploadSession) ArrowSchema() *arrow.Schema { if u.arrowSchema != nil { return u.arrowSchema } u.arrowSchema = u.schema.ToArrowSchema() return u.arrowSchema } func (u *UploadSession) Status() UploadStatus { return u.status } func (u *UploadSession) ShouldTransform() bool { return u.shouldTransformDate } func (u *UploadSession) ResourceUrl() string { rb := common.NewResourceBuilder(u.ProjectName) return rb.Table(u.SchemaName, u.TableName) } func (u *UploadSession) OpenRecordArrowWriter(blockId int) (*RecordArrowWriter, error) { conn, err := u.newUploadConnection(blockId, true) if err != nil { return nil, errors.WithStack(err) } writer := newRecordArrowWriter(conn, u.arrowSchema) return &writer, nil } func (u *UploadSession) OpenRecordWriter(blockId int) (*RecordProtocWriter, error) { conn, err := u.newUploadConnection(blockId, false) if err != nil { return nil, errors.WithStack(err) } writer := newRecordProtocHttpWriter(conn, u.schema.Columns, false) return &writer, nil } func (u *UploadSession) Load() error { req, err := u.newLoadRequest() if err != nil { return errors.WithStack(err) } return errors.WithStack(u.loadInformation(req)) } func (u *UploadSession) Commit(blockIds []int) error { err := u.Load() if err != nil { return errors.WithStack(err) } errMsgFmt := "blocks server got are %v, blocks uploaded are %v" if len(u.blockIds) != len(blockIds) { return errors.Errorf(errMsgFmt, u.blockIds, blockIds) } for _, idWanted := range blockIds { found := false for _, realId := range u.blockIds { if idWanted == realId { found = true break } } if !found { return errors.Errorf(errMsgFmt, u.blockIds, blockIds) } } queryArgs := make(url.Values, 2) queryArgs.Set("uploadid", u.Id) if u.partitionKey != "" { queryArgs.Set("partition", u.partitionKey) } headers := getCommonHeaders() req, err := u.RestClient.NewRequestWithParamsAndHeaders(common.HttpMethod.PostMethod, u.ResourceUrl(), nil, queryArgs, headers) if err != nil { return errors.WithStack(err) } err = Retry(func() error { res, err := u.RestClient.Do(req) if err != nil { return err } if res.StatusCode/100 != 2 { return restclient.NewHttpNotOk(res) } else { if res.Body != nil { _ = res.Body.Close() } } return nil }) return errors.WithStack(err) } func (u *UploadSession) loadInformation(req *http.Request) error { type ResModel struct { Initiated string `json:"Initiated"` IsOverwrite bool `json:"IsOverwrite"` MaxFieldSize int `json:"MaxFieldSize"` Owner string `json:"Owner"` Schema schemaResModel `json:"Schema"` Status string `json:"Status"` UploadID string `json:"UploadID"` QuotaName string `json:"QuotaName"` UploadedBlockList []struct { BlockID int `json:"BlockID"` CreateTime int `json:"CreateTime"` Date string `json:"Date"` FileName string `json:"FileName"` RecordCount int `json:"RecordCount"` Version int64 `json:"Version"` } `json:"UploadedBlockList"` } var resModel ResModel err := u.RestClient.DoWithParseFunc(req, func(res *http.Response) error { if res.StatusCode/100 != 2 { return errors.WithStack(restclient.NewHttpNotOk(res)) } u.shouldTransformDate = res.Header.Get(common.HttpHeaderOdpsDateTransFrom) == "true" decoder := json.NewDecoder(res.Body) return errors.WithStack(decoder.Decode(&resModel)) }) if err != nil { return errors.WithStack(err) } tableSchema, err := resModel.Schema.toTableSchema(u.TableName) if err != nil { return errors.WithStack(err) } u.Id = resModel.UploadID u.fieldMaxSize = resModel.MaxFieldSize if resModel.QuotaName != "" { u.QuotaName = resModel.QuotaName } u.status = UploadStatusFromStr(resModel.Status) u.schema = tableSchema u.arrowSchema = tableSchema.ToArrowSchema() u.blockIds = make([]int, len(resModel.UploadedBlockList)) for i, b := range resModel.UploadedBlockList { u.blockIds[i] = b.BlockID } return nil } func (u *UploadSession) newInitiationRequest() (*http.Request, error) { headers := getCommonHeaders() resource := u.ResourceUrl() queryArgs := make(url.Values, 3) queryArgs.Set("uploads", "") if u.partitionKey != "" { queryArgs.Set("partition", u.partitionKey) } if u.CreatePartition { queryArgs.Set("create_partition", "true") } if u.Overwrite { queryArgs.Set("overwrite", "true") } if u.QuotaName != "" { queryArgs.Set("quotaName", u.QuotaName) } req, err := u.RestClient.NewRequestWithParamsAndHeaders(common.HttpMethod.PostMethod, resource, nil, queryArgs, headers) if err != nil { return nil, errors.WithStack(err) } return req, nil } func (u *UploadSession) newLoadRequest() (*http.Request, error) { resource := u.ResourceUrl() queryArgs := make(url.Values, 2) queryArgs.Set("uploadid", u.Id) if u.partitionKey != "" { queryArgs.Set("partition", u.partitionKey) } req, err := u.RestClient.NewRequestWithUrlQuery(common.HttpMethod.GetMethod, resource, nil, queryArgs) if err != nil { return nil, errors.WithStack(err) } addCommonSessionHttpHeader(req.Header) return req, nil } func (u *UploadSession) newUploadConnection(blockId int, useArrow bool) (*httpConnection, error) { headers := getCommonHeaders() queryArgs := make(url.Values, 4) queryArgs.Set("uploadid", u.Id) queryArgs.Set("blockid", strconv.Itoa(blockId)) if useArrow { queryArgs.Set("arrow", "") } if u.partitionKey != "" { queryArgs.Set("partition", u.partitionKey) } var reader io.ReadCloser var writer io.WriteCloser reader, writer = io.Pipe() resource := u.ResourceUrl() req, err := u.RestClient.NewRequestWithParamsAndHeaders(common.HttpMethod.PutMethod, resource, reader, queryArgs, headers) if err != nil { return nil, errors.WithStack(err) } req.Header.Set(common.HttpHeaderContentType, "application/octet-stream") if u.Compressor != nil { req.Header.Set("Content-Encoding", u.Compressor.Name()) } resChan := make(chan resOrErr) go func() { res, err := u.RestClient.Do(req) resChan <- resOrErr{err: err, res: res} }() httpConn := newHttpConnection(writer, resChan, u.Compressor) return httpConn, nil } func UploadStatusFromStr(s string) UploadStatus { switch strings.ToUpper(s) { case "NORMAL": return UploadStatusNormal case "CLOSING": return UploadStatusClosing case "CLOSED": return UploadStatusClosed case "CANCELED": return UploadStatusCanceled case "EXPIRED": return UploadStatusExpired case "CRITICAL": return UploadStatusCritical case "COMMITTING": return UploadStatusCommitting default: return UploadStatusUnknown } } func (status UploadStatus) String() string { switch status { case UploadStatusUnknown: return "UNKNOWN" case UploadStatusNormal: return "NORMAL" case UploadStatusClosing: return "CLOSING" case UploadStatusClosed: return "CLOSED" case UploadStatusCanceled: return "CANCELED" case UploadStatusExpired: return "EXPIRED" case UploadStatusCritical: return "CRITICAL" case UploadStatusCommitting: return "COMMITTING" default: return "UNKNOWN" } }