odps/tunnel/stream_upload_session.go (304 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tunnel
import (
"encoding/json"
"io"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"github.com/pkg/errors"
"github.com/aliyun/aliyun-odps-go-sdk/odps/common"
"github.com/aliyun/aliyun-odps-go-sdk/odps/restclient"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
)
type StreamUploadSession struct {
id string
ProjectName string
// TODO use schema to get the resource url of a table
SchemaName string
TableName string
// The partition keys used by a session can not contain "'", for example, "region=hangzhou" is a
// positive case, and "region='hangzhou'" is a negative case. But the partition keys like "region='hangzhou'" are more
// common, to avoid the users use the error format, the partitionKey of UploadSession is private, it can be set when
// creating a session or using SetPartitionKey.
partitionKey string
Compressor Compressor
RestClient restclient.RestClient
Columns []string
P2PMode bool
CreatePartition bool
QuotaName string
SlotNum int
slotSelector slotSelector
schema tableschema.TableSchema
schemaVersion int
allowSchemaMismatch bool
}
func (su *StreamUploadSession) ResourceUrl() string {
rb := common.NewResourceBuilder(su.ProjectName)
tableResource := rb.Table(su.SchemaName, su.TableName)
return tableResource + common.StreamsPath
}
// CreateStreamUploadSession create a new stream upload session before uploading data。
// The opts can be one or more of:
// SessionCfg.WithPartitionKey
// SessionCfg.WithSchemaName
// SessionCfg.WithDefaultDeflateCompressor, using deflate compressor with default level
// SessionCfg.WithDeflateCompressor, using deflate compressor with specific level
// SessionCfg.WithSnappyFramedCompressor
// SessionCfg.SlotNum, 暂不对外开放
// SessionCfg.CreatePartition, create partition if the partition specified by WithPartitionKey does not exist
// SessionCfg.AllowSchemaMismatch, Whether to allow the schema of uploaded data to be inconsistent with the table schema. The default value is true. When set to false, the Append operation will check the type of uploaded data, and the server will throw a specific exception during Flush.
// SessionCfg.Columns, TODO 作用待明确
func CreateStreamUploadSession(
projectName, tableName string,
restClient restclient.RestClient,
opts ...Option,
) (*StreamUploadSession, error) {
cfg := newSessionConfig(opts...)
session := StreamUploadSession{
ProjectName: projectName,
SchemaName: cfg.SchemaName,
TableName: tableName,
partitionKey: cfg.PartitionKey,
Compressor: cfg.Compressor,
RestClient: restClient,
Columns: cfg.Columns,
CreatePartition: cfg.CreatePartition,
SlotNum: cfg.SlotNum,
schemaVersion: cfg.SchemaVersion,
allowSchemaMismatch: cfg.AllowSchemaMismatch,
}
req, err := session.newInitiationRequest()
if err != nil {
return nil, errors.WithStack(err)
}
err = session.loadInformation(req, true)
if err != nil {
return nil, errors.WithStack(err)
}
return &session, nil
}
func (su *StreamUploadSession) OpenRecordPackWriter() *RecordPackStreamWriter {
w := newRecordStreamHttpWriter(su)
return &w
}
func (su *StreamUploadSession) Schema() *tableschema.TableSchema {
return &su.schema
}
func (su *StreamUploadSession) SchemaVersion() int {
return su.schemaVersion
}
func (su *StreamUploadSession) newInitiationRequest() (*http.Request, error) {
queryArgs := make(url.Values, 7)
if su.partitionKey != "" {
queryArgs.Set("partition", su.partitionKey)
}
if su.CreatePartition {
queryArgs.Set("create_partition", "")
}
if len(su.Columns) > 0 {
queryArgs.Set("zorder_columns", strings.Join(su.Columns, ","))
}
if su.schemaVersion >= 0 {
queryArgs.Set("schema_version", strconv.Itoa(su.schemaVersion))
}
if su.QuotaName != "" {
queryArgs.Set("quotaName", su.QuotaName)
}
headers := getCommonHeaders()
if su.SlotNum > 0 {
headers["odps-tunnel-slot-num"] = strconv.Itoa(su.SlotNum)
}
resource := su.ResourceUrl()
req, err := su.RestClient.NewRequestWithParamsAndHeaders(common.HttpMethod.PostMethod, resource, nil, queryArgs, headers)
if err != nil {
return nil, errors.WithStack(err)
}
return req, nil
}
func (su *StreamUploadSession) newReLoadRequest() (*http.Request, error) {
queryArgs := make(url.Values, 4)
queryArgs.Set("uploadid", su.id)
if su.schemaVersion >= 0 {
queryArgs.Set("schema_version", strconv.Itoa(su.schemaVersion))
}
if su.partitionKey != "" {
queryArgs.Set("partition", su.partitionKey)
}
if su.QuotaName != "" {
queryArgs.Set("quotaName", su.QuotaName)
}
headers := getCommonHeaders()
resource := su.ResourceUrl()
req, err := su.RestClient.NewRequestWithParamsAndHeaders(common.HttpMethod.PostMethod, resource, nil, queryArgs, headers)
if err != nil {
return nil, errors.WithStack(err)
}
return req, nil
}
func (su *StreamUploadSession) loadInformation(req *http.Request, inited bool) error {
type ResModel struct {
CompressMode string `json:"compress_mode"`
FileFormat string `json:"file_format"`
Schema schemaResModel `json:"schema"`
SessionName string `json:"session_name"`
Slots [][]interface{} `json:"slots"`
Status string `json:"status"`
QuotaName string `json:"quota_name"`
SchemaVersion int `json:"schema_version"`
}
var resModel ResModel
var requestId string
err := su.RestClient.DoWithParseFunc(req, func(res *http.Response) error {
requestId = res.Header.Get("x-odps-request-id")
if res.StatusCode/100 != 2 {
return errors.WithStack(restclient.NewHttpNotOk(res))
}
decoder := json.NewDecoder(res.Body)
return errors.WithStack(decoder.Decode(&resModel))
})
if err != nil {
return errors.WithStack(err)
}
if resModel.Status == "init" {
return errors.Errorf("Session is initiating. RequestId:%s Session ID:%s", requestId, resModel.SessionName)
}
if inited {
tableSchema, err := resModel.Schema.toTableSchema(su.TableName)
if err != nil {
return errors.WithStack(err)
}
su.id = resModel.SessionName
su.schema = tableSchema
su.schemaVersion = resModel.SchemaVersion
if resModel.QuotaName != "" {
su.QuotaName = resModel.QuotaName
}
}
slots := make([]slot, len(resModel.Slots))
for i, rawSlot := range resModel.Slots {
slots[i], err = newSlot(strconv.Itoa(int(rawSlot[0].(float64))), rawSlot[1].(string))
if err != nil {
return errors.WithStack(err)
}
}
su.slotSelector = newSlotSelect(slots)
return nil
}
func (su *StreamUploadSession) flushStream(streamWriter *RecordPackStreamWriter, timeout time.Duration) (string, int, error) {
var reader io.ReadCloser
var writer io.WriteCloser
reader, writer = io.Pipe()
currentSlot := su.slotSelector.NextSlot()
conn, err := su.newUploadConnection(reader, writer, currentSlot, streamWriter.DataSize(), streamWriter.RecordCount(), timeout)
if err != nil {
return "", 0, errors.WithStack(err)
}
// write bytes to http uploading connection
_, err = conn.Writer.Write(streamWriter.buffer.Bytes())
if err != nil {
// 在write失败时,如果http请求还未完全发送到server, server会等待http请求完成,造成 conn.closeRes()卡住。
// 因为conn.closeRes()会读取http响应流,而server一直在等剩余的http请求内容。
// 注意: 这里关掉reader后可能导致writer写数据失败、程序退出而丢失了writer的真实错误原因
_ = reader.Close()
// 显示关闭打开的连接,并在http返回非200状态时,获取实际的http错误
closeError := conn.closeRes()
if closeError != nil {
return "", 0, errors.WithStack(closeError)
}
return "", 0, errors.WithStack(err)
}
// close http writer
err = conn.Writer.Close()
if err != nil {
closeError := conn.closeRes()
if closeError != nil {
return "", 0, errors.WithStack(closeError)
}
return "", 0, errors.WithStack(err)
}
// get and close response
rOrE := <-conn.resChan
if rOrE.err != nil {
return "", 0, errors.WithStack(rOrE.err)
}
res := rOrE.res
if res.StatusCode/100 != 2 {
return "", 0, errors.WithStack(restclient.NewHttpNotOk(res))
}
err = res.Body.Close()
if err != nil {
return "", 0, errors.WithStack(err)
}
slotNumStr := res.Header.Get(common.HttpHeaderOdpsSlotNum)
newSlotServer := res.Header.Get(common.HttpHeaderRoutedServer)
newSlotNum, err := strconv.Atoi(slotNumStr)
if err != nil {
return "", 0, errors.WithMessage(err, "invalid slot num get from http odps-tunnel-slot-num header")
}
if newSlotNum != su.slotSelector.SlotNum() {
err = su.reloadSlotNum()
if err != nil {
return "", 0, errors.WithStack(err)
}
} else if newSlotServer != currentSlot.Server() {
err := currentSlot.SetServer(newSlotServer)
if err != nil {
return "", 0, errors.WithStack(err)
}
}
return res.Header.Get(common.HttpHeaderOdpsRequestId), conn.bytesCount(), nil
}
func (su *StreamUploadSession) newUploadConnection(reader io.ReadCloser, writer io.WriteCloser, currentSlot *slot, dataSize int64, recordCount int64, timeout time.Duration) (*httpConnection, error) {
queryArgs := make(url.Values, 5)
queryArgs.Set("uploadid", su.id)
queryArgs.Set("slotid", currentSlot.id)
if su.schemaVersion >= 0 {
queryArgs.Set("schema_version", strconv.Itoa(su.schemaVersion))
}
if su.partitionKey != "" {
queryArgs.Set("partition", su.partitionKey)
}
if recordCount > 0 {
queryArgs.Set("record_count", strconv.FormatInt(recordCount, 10))
}
if len(su.Columns) > 0 {
queryArgs.Set("zorder_columns", strings.Join(su.Columns, ","))
}
if su.QuotaName != "" {
queryArgs.Set("quotaName", su.QuotaName)
}
queryArgs.Set("check_latest_schema", strconv.FormatBool(!su.allowSchemaMismatch))
headers := getCommonHeaders()
if dataSize < 0 {
headers[common.HttpHeaderTransferEncoding] = "chunked"
} else {
headers[common.HttpHeaderContentLength] = strconv.FormatInt(dataSize, 10)
}
headers[common.HttpHeaderContentType] = "application/octet-stream"
headers[common.HttpHeaderOdpsSlotNum] = strconv.Itoa(su.slotSelector.SlotNum())
if su.Compressor != nil {
headers[common.HttpHeaderContentEncoding] = su.Compressor.Name()
}
headers[common.HttpHeaderRoutedServer] = currentSlot.Server()
resource := su.ResourceUrl()
req, err := su.RestClient.NewRequestWithParamsAndHeaders(common.HttpMethod.PutMethod, resource, reader, queryArgs, headers)
if err != nil {
return nil, errors.WithStack(err)
}
resChan := make(chan resOrErr)
go func() {
endpoint := su.RestClient.Endpoint()
if su.P2PMode {
defaultEndpoint, _ := url.Parse(su.RestClient.Endpoint())
newUrl := url.URL{
Scheme: defaultEndpoint.Scheme,
Host: currentSlot.ip,
}
endpoint = newUrl.String()
}
client := restclient.NewOdpsRestClient(su.RestClient, endpoint)
client.TcpConnectionTimeout = su.RestClient.TcpConnectionTimeout
client.HttpTimeout = su.RestClient.HttpTimeout
if timeout > 0 {
client.HttpTimeout = timeout
}
res, err := client.Do(req)
resChan <- resOrErr{err: err, res: res}
}()
httpConn := newHttpConnection(writer, resChan, su.Compressor)
return httpConn, nil
}
func (su *StreamUploadSession) reloadSlotNum() error {
req, err := su.newReLoadRequest()
if err != nil {
return errors.WithStack(err)
}
return errors.WithStack(su.loadInformation(req, false))
}
func getCommonHeaders() map[string]string {
header := make(map[string]string)
header[common.HttpHeaderOdpsDateTransFrom] = DateTransformVersion
header[common.HttpHeaderOdpsTunnelVersion] = Version
header[common.HttpHeaderOdpsSdkSupportSchemaEvolution] = "true"
return header
}