odps/tunnel/common.go (111 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tunnel
import (
"bytes"
"io"
"net/http"
"time"
"github.com/pkg/errors"
"github.com/aliyun/aliyun-odps-go-sdk/odps/common"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
)
func addCommonSessionHttpHeader(header http.Header) {
header.Add(common.HttpHeaderOdpsDateTransFrom, DateTransformVersion)
header.Add(common.HttpHeaderOdpsTunnelVersion, Version)
}
type columnResModel struct {
ColumnId string `json:"column_id"`
Comment string `json:"comment"`
DefaultValue string `json:"default_value"`
Name string `json:"name"`
Nullable bool `json:"nullable"`
Type string `json:"type"`
}
type schemaResModel struct {
IsVirtualView bool `json:"IsVirtualView"`
Columns []columnResModel `json:"columns"`
PartitionKeys []columnResModel `json:"partitionKeys"`
}
func (s *schemaResModel) toTableSchema(tableName string) (tableschema.TableSchema, error) {
tableSchema := tableschema.TableSchema{
TableName: tableName,
IsVirtualView: s.IsVirtualView,
}
tableSchema.Columns = make([]tableschema.Column, len(s.Columns))
tableSchema.PartitionColumns = make([]tableschema.Column, len(s.PartitionKeys))
toOdpsColumn := func(rawColumn columnResModel) (tableschema.Column, error) {
_type, err := datatype.ParseDataType(rawColumn.Type)
if err != nil {
return tableschema.Column{}, errors.WithStack(err)
}
column := tableschema.Column{
Name: rawColumn.Name,
Type: _type,
Comment: rawColumn.Comment,
NotNull: !rawColumn.Nullable,
DefaultValue: rawColumn.DefaultValue,
}
return column, nil
}
for i, rawColumn := range s.Columns {
column, err := toOdpsColumn(rawColumn)
if err != nil {
return tableschema.TableSchema{}, errors.WithStack(err)
}
tableSchema.Columns[i] = column
}
for i, rawColumn := range s.PartitionKeys {
column, err := toOdpsColumn(rawColumn)
if err != nil {
return tableschema.TableSchema{}, errors.WithStack(err)
}
tableSchema.PartitionColumns[i] = column
}
return tableSchema, nil
}
func min(x, y int) int {
if x <= y {
return x
}
return y
}
func Retry(f func() error) error {
// TODO: use tunnel retry strategy and add retry logger
sleepTime := int64(1)
var err error
for i := 0; i < 3; i++ {
err = f()
if err == nil {
break
}
sleepTime *= 1 << i
time.Sleep(time.Duration(sleepTime) * time.Second)
}
return err
}
type bufWriter struct {
buf *bytes.Buffer
}
func (bw *bufWriter) Write(b []byte) (int, error) {
return bw.buf.Write(b)
}
func (bw *bufWriter) Close() error {
return nil
}
// 用于记录tunnel上传时http发送的数据量大小。对于压缩后的数据不能通过
// write方法返回的n来确定压缩后的数据大小。只能
// 1. 用bytesRecordWriter包装http conn
// 2. 用compressor writer包装bytesRecordWriter
// 3. compressor在压缩数据后,会调用bytesRecordWriter.write
// 4. bytesRecordWriter.write接收到的数据就是经过compressor压缩后的数据
// 5. bytesRecordWriter.write记录接收到的数据大小
type bytesRecordWriter struct {
writer io.WriteCloser
bytesN int
}
func newBytesRecordWriter(writer io.WriteCloser) *bytesRecordWriter {
return &bytesRecordWriter{writer: writer}
}
func (brw *bytesRecordWriter) BytesN() int {
return brw.bytesN
}
func (brw *bytesRecordWriter) Write(b []byte) (int, error) {
n, err := brw.writer.Write(b)
brw.bytesN += n
return n, err
}
func (brw *bytesRecordWriter) Close() error {
return brw.writer.Close()
}