oss/client.go (1,268 lines of code) (raw):
package oss
import (
"bytes"
"context"
"crypto/md5"
"encoding/base64"
"encoding/json"
"encoding/xml"
"fmt"
"hash"
"io"
"net"
"net/http"
"net/url"
"reflect"
"strconv"
"strings"
"time"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/retry"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/signer"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/transport"
)
type Options struct {
Product string
Region string
Endpoint *url.URL
RetryMaxAttempts *int
Retryer retry.Retryer
Signer signer.Signer
CredentialsProvider credentials.CredentialsProvider
HttpClient HTTPClient
ResponseHandlers []func(*http.Response) error
UrlStyle UrlStyleType
FeatureFlags FeatureFlagsType
OpReadWriteTimeout *time.Duration
AuthMethod *AuthMethodType
AdditionalHeaders []string
}
func (c Options) Copy() Options {
to := c
to.ResponseHandlers = make([]func(*http.Response) error, len(c.ResponseHandlers))
copy(to.ResponseHandlers, c.ResponseHandlers)
return to
}
func OpReadWriteTimeout(value time.Duration) func(*Options) {
return func(o *Options) {
o.OpReadWriteTimeout = Ptr(value)
}
}
type innerOptions struct {
BwTokenBuckets BwTokenBuckets
// A clock offset that how much client time is different from server time
ClockOffset time.Duration
// Logger
Log Logger
// UserAgent
UserAgent string
}
type Client struct {
options Options
inner innerOptions
}
func NewClient(cfg *Config, optFns ...func(*Options)) *Client {
options := Options{
Product: DefaultProduct,
Region: ToString(cfg.Region),
RetryMaxAttempts: cfg.RetryMaxAttempts,
Retryer: cfg.Retryer,
CredentialsProvider: cfg.CredentialsProvider,
HttpClient: cfg.HttpClient,
FeatureFlags: FeatureFlagsDefault,
AdditionalHeaders: cfg.AdditionalHeaders,
}
inner := innerOptions{
Log: NewLogger(ToInt(cfg.LogLevel), cfg.LogPrinter),
UserAgent: buildUserAgent(cfg),
}
resolveEndpoint(cfg, &options)
resolveRetryer(cfg, &options)
resolveHTTPClient(cfg, &options, &inner)
resolveSigner(cfg, &options)
resolveUrlStyle(cfg, &options)
resolveFeatureFlags(cfg, &options)
resolveCloudBox(cfg, &options)
for _, fn := range optFns {
fn(&options)
}
client := &Client{
options: options,
inner: inner,
}
return client
}
func resolveEndpoint(cfg *Config, o *Options) {
disableSSL := ToBool(cfg.DisableSSL)
endpoint := ToString(cfg.Endpoint)
region := ToString(cfg.Region)
if len(endpoint) > 0 {
endpoint = addEndpointScheme(endpoint, disableSSL)
} else if isValidRegion(region) {
endpoint = endpointFromRegion(
region,
disableSSL,
func() EndpointType {
if ToBool(cfg.UseInternalEndpoint) {
return EndpointInternal
} else if ToBool(cfg.UseDualStackEndpoint) {
return EndpointDualStack
} else if ToBool(cfg.UseAccelerateEndpoint) {
return EndpointAccelerate
}
return EndpointPublic
}(),
)
}
if endpoint == "" {
return
}
o.Endpoint, _ = url.Parse(endpoint)
}
func resolveRetryer(_ *Config, o *Options) {
if o.Retryer != nil {
return
}
o.Retryer = retry.NewStandard()
}
func resolveHTTPClient(cfg *Config, o *Options, inner *innerOptions) {
if o.HttpClient != nil {
return
}
//config in http.Transport
custom := []func(*http.Transport){}
if cfg.InsecureSkipVerify != nil {
custom = append(custom, transport.InsecureSkipVerify(*cfg.InsecureSkipVerify))
}
if cfg.ProxyFromEnvironment != nil && *cfg.ProxyFromEnvironment {
custom = append(custom, transport.ProxyFromEnvironment())
}
if cfg.ProxyHost != nil {
if url, err := url.Parse(*cfg.ProxyHost); err == nil {
custom = append(custom, transport.HttpProxy(url))
}
}
//config in transport package
tcfg := &transport.Config{}
if cfg.ConnectTimeout != nil {
tcfg.ConnectTimeout = cfg.ConnectTimeout
}
if cfg.ReadWriteTimeout != nil {
tcfg.ReadWriteTimeout = cfg.ReadWriteTimeout
}
if cfg.EnabledRedirect != nil {
tcfg.EnabledRedirect = cfg.EnabledRedirect
}
if cfg.UploadBandwidthlimit != nil {
value := *cfg.UploadBandwidthlimit * 1024
tb := newBwTokenBucket(value)
tcfg.PostWrite = append(tcfg.PostWrite, func(n int, _ error) {
tb.LimitBandwidth(n)
})
inner.BwTokenBuckets[BwTokenBucketSlotTx] = tb
}
if cfg.DownloadBandwidthlimit != nil {
value := *cfg.DownloadBandwidthlimit * 1024
tb := newBwTokenBucket(value)
tcfg.PostRead = append(tcfg.PostRead, func(n int, _ error) {
tb.LimitBandwidth(n)
})
inner.BwTokenBuckets[BwTokenBucketSlotRx] = tb
}
o.HttpClient = transport.NewHttpClient(tcfg, custom...)
}
func resolveSigner(cfg *Config, o *Options) {
if o.Signer != nil {
return
}
ver := DefaultSignatureVersion
if cfg.SignatureVersion != nil {
ver = *cfg.SignatureVersion
}
switch ver {
case SignatureVersionV1:
o.Signer = &signer.SignerV1{}
default:
o.Signer = &signer.SignerV4{}
}
}
func resolveUrlStyle(cfg *Config, o *Options) {
if cfg.UseCName != nil && *cfg.UseCName {
o.UrlStyle = UrlStyleCName
} else if cfg.UsePathStyle != nil && *cfg.UsePathStyle {
o.UrlStyle = UrlStylePath
} else {
o.UrlStyle = UrlStyleVirtualHosted
}
// if the endpoint is ip, set to path-style
if o.Endpoint != nil {
if ip := net.ParseIP(o.Endpoint.Hostname()); ip != nil {
o.UrlStyle = UrlStylePath
}
}
}
func resolveFeatureFlags(cfg *Config, o *Options) {
if ToBool(cfg.DisableDownloadCRC64Check) {
o.FeatureFlags = o.FeatureFlags & ^FeatureEnableCRC64CheckDownload
}
if ToBool(cfg.DisableUploadCRC64Check) {
o.FeatureFlags = o.FeatureFlags & ^FeatureEnableCRC64CheckUpload
}
}
func resolveCloudBox(cfg *Config, o *Options) {
if cfg.CloudBoxId != nil {
o.Region = ToString(cfg.CloudBoxId)
o.Product = CloudBoxProduct
return
}
if !ToBool(cfg.EnableAutoDetectCloudBoxId) {
return
}
if o.Endpoint == nil {
return
}
//cb-***.{region}.oss-cloudbox-control.aliyuncs.com
//cb-***.{region}.oss-cloudbox.aliyuncs.com
host := o.Endpoint.Host
if !(strings.HasSuffix(host, ".oss-cloudbox.aliyuncs.com") ||
strings.HasSuffix(host, ".oss-cloudbox-control.aliyuncs.com")) {
return
}
keys := strings.Split(host, ".")
if keys == nil ||
len(keys) != 5 ||
!strings.HasPrefix(keys[0], "cb-") {
return
}
o.Region = keys[0]
o.Product = CloudBoxProduct
}
func buildUserAgent(cfg *Config) string {
if cfg.UserAgent == nil {
return defaultUserAgent
}
return fmt.Sprintf("%s/%s", defaultUserAgent, ToString(cfg.UserAgent))
}
func (c *Client) invokeOperation(ctx context.Context, input *OperationInput, optFns []func(*Options)) (output *OperationOutput, err error) {
if c.getLogLevel() >= LogInfo {
c.inner.Log.Infof("InvokeOperation Start: input[%p], OpName:%s, Bucket:%s, Key:%s",
input, input.OpName,
ToString(input.Bucket), ToString(input.Key))
defer func() {
c.inner.Log.Infof("InvokeOperation End: input[%p], OpName:%s, output:'%v', err:'%v'",
input, input.OpName,
c.dumpOperationOutput(output), err)
}()
}
options := c.options.Copy()
opOpt := Options{}
for _, fn := range optFns {
fn(&opOpt)
}
applyOperationOpt(&options, &opOpt)
applyOperationMetadata(input, &options)
ctx = applyOperationContext(ctx, &options)
output, err = c.sendRequest(ctx, input, &options)
if err != nil {
return output, &OperationError{
name: input.OpName,
err: err}
}
return output, err
}
func (c *Client) sendRequest(ctx context.Context, input *OperationInput, opts *Options) (output *OperationOutput, err error) {
var request *http.Request
var response *http.Response
if c.getLogLevel() >= LogInfo {
c.inner.Log.Infof("sendRequest Start: input[%p]", input)
defer func() {
c.inner.Log.Infof("sendRequest End: input[%p], http.Request[%p], http.Response[%p]", input, request, response)
}()
}
// covert input into httpRequest
if !isValidEndpoint(opts.Endpoint) {
return output, NewErrParamInvalid("Endpoint")
}
var writers []io.Writer
// tracker in OperationMetaData
for _, w := range input.OpMetadata.Values(OpMetaKeyRequestBodyTracker) {
if ww, ok := w.(io.Writer); ok {
writers = append(writers, ww)
}
}
// host & path
host, path := buildURL(input, opts)
strUrl := fmt.Sprintf("%s://%s%s", opts.Endpoint.Scheme, host, path)
// querys
if len(input.Parameters) > 0 {
var buf bytes.Buffer
for k, v := range input.Parameters {
if buf.Len() > 0 {
buf.WriteByte('&')
}
buf.WriteString(url.QueryEscape(k))
if len(v) > 0 {
buf.WriteString("=" + strings.Replace(url.QueryEscape(v), "+", "%20", -1))
}
}
strUrl += "?" + buf.String()
}
request, err = http.NewRequestWithContext(ctx, input.Method, strUrl, nil)
if err != nil {
return output, err
}
// headers
for k, v := range input.Headers {
if len(k) > 0 && len(v) > 0 {
request.Header.Add(k, v)
}
}
request.Header.Set("User-Agent", c.inner.UserAgent)
// body
var body io.Reader
if input.Body == nil {
body = strings.NewReader("")
} else {
body = input.Body
}
var length int64
if clen := request.Header.Get("Content-Length"); clen != "" {
length, _ = strconv.ParseInt(clen, 10, 64)
} else {
length = GetReaderLen(body)
}
if length >= 0 {
request.ContentLength = length
}
request.Body = TeeReadNopCloser(body, writers...)
//signing context
subResource, _ := input.OpMetadata.Get(signer.SubResource).([]string)
clockOffset := c.inner.ClockOffset
signingCtx := &signer.SigningContext{
Product: Ptr(opts.Product),
Region: Ptr(opts.Region),
Bucket: input.Bucket,
Key: input.Key,
Request: request,
SubResource: subResource,
AuthMethodQuery: opts.AuthMethod != nil && *opts.AuthMethod == AuthMethodQuery,
ClockOffset: clockOffset,
AdditionalHeaders: opts.AdditionalHeaders,
}
if date := request.Header.Get(HeaderOssDate); date != "" {
signingCtx.Time, _ = http.ParseTime(date)
} else if signTime, ok := input.OpMetadata.Get(signer.SignTime).(time.Time); ok {
signingCtx.Time = signTime
}
// send http request
response, err = c.sendHttpRequest(ctx, signingCtx, opts)
if err != nil {
return output, err
}
// covert http response into output context
output = &OperationOutput{
Input: input,
Status: response.Status,
StatusCode: response.StatusCode,
Body: response.Body,
Headers: response.Header,
httpRequest: request,
}
// save other info by Metadata filed, ex. retry detail info
//output.OpMetadata.Set(...)
if signingCtx.AuthMethodQuery {
output.OpMetadata.Set(signer.SignTime, signingCtx.Time)
}
if signingCtx.ClockOffset != clockOffset {
c.inner.ClockOffset = signingCtx.ClockOffset
}
return output, err
}
func (c *Client) sendHttpRequest(ctx context.Context, signingCtx *signer.SigningContext, opts *Options) (response *http.Response, err error) {
request := signingCtx.Request
retryer := opts.Retryer
maxAttempts := c.retryMaxAttempts(opts)
body, _ := request.Body.(*teeReadNopCloser)
resetTime := signingCtx.Time.IsZero()
body.Mark()
for tries := 1; tries <= maxAttempts; tries++ {
if tries > 1 {
delay, err := retryer.RetryDelay(tries, err)
if err != nil {
break
}
if err = sleepWithContext(ctx, delay); err != nil {
err = &CanceledError{Err: err}
break
}
if err = body.Reset(); err != nil {
break
}
if resetTime {
signingCtx.Time = time.Time{}
}
c.inner.Log.Infof("Attempt retry, request[%p], tries:%v, retry delay:%v", request, tries, delay)
}
if response, err = c.sendHttpRequestOnce(ctx, signingCtx, opts); err == nil {
break
}
c.postSendHttpRequestOnce(signingCtx, response, err)
if isContextError(ctx, &err) {
err = &CanceledError{Err: err}
break
}
if !body.IsSeekable() {
break
}
if !retryer.IsErrorRetryable(err) {
break
}
}
return response, err
}
func (c *Client) sendHttpRequestOnce(ctx context.Context, signingCtx *signer.SigningContext, opts *Options) (
response *http.Response, err error,
) {
if c.getLogLevel() > LogInfo {
c.inner.Log.Infof("sendHttpRequestOnce Start, http.Request[%p]", signingCtx.Request)
defer func() {
c.inner.Log.Infof("sendHttpRequestOnce End, http.Request[%p], response[%p], err:%v", signingCtx.Request, response, err)
}()
}
if _, anonymous := opts.CredentialsProvider.(*credentials.AnonymousCredentialsProvider); !anonymous {
cred, err := opts.CredentialsProvider.GetCredentials(ctx)
if err != nil {
return response, err
}
signingCtx.Credentials = &cred
if err = c.options.Signer.Sign(ctx, signingCtx); err != nil {
return response, err
}
c.inner.Log.Debugf("sendHttpRequestOnce::Sign request[%p], StringToSign:%s", signingCtx.Request, signingCtx.StringToSign)
}
c.logHttpPRequet(signingCtx.Request)
if response, err = opts.HttpClient.Do(signingCtx.Request); err != nil {
return response, err
}
c.logHttpResponse(signingCtx.Request, response)
for _, fn := range opts.ResponseHandlers {
if err = fn(response); err != nil {
return response, err
}
}
return response, err
}
func (c *Client) postSendHttpRequestOnce(signingCtx *signer.SigningContext, _ *http.Response, err error) {
if err != nil {
switch e := err.(type) {
case *ServiceError:
if c.hasFeature(FeatureCorrectClockSkew) &&
e.Code == "RequestTimeTooSkewed" &&
!e.Timestamp.IsZero() {
signingCtx.ClockOffset = e.Timestamp.Sub(signingCtx.Time)
c.inner.Log.Warnf("Got RequestTimeTooSkewed error, correct clock request[%p], ClockOffset:%v, Server Time:%v, Client time:%v",
signingCtx.Request, signingCtx.ClockOffset, e.Timestamp, signingCtx.Time)
}
}
}
}
func buildURL(input *OperationInput, opts *Options) (host string, path string) {
if input == nil || opts == nil || opts.Endpoint == nil {
return host, path
}
var paths []string
if input.Bucket == nil {
host = opts.Endpoint.Host
} else {
switch opts.UrlStyle {
default: // UrlStyleVirtualHosted
host = fmt.Sprintf("%s.%s", *input.Bucket, opts.Endpoint.Host)
case UrlStylePath:
host = opts.Endpoint.Host
paths = append(paths, *input.Bucket)
if input.Key == nil {
paths = append(paths, "")
}
case UrlStyleCName:
host = opts.Endpoint.Host
}
}
if input.Key != nil {
paths = append(paths, escapePath(*input.Key, false))
}
return host, ("/" + strings.Join(paths, "/"))
}
func serviceErrorResponseHandler(response *http.Response) error {
if response.StatusCode/100 == 2 {
return nil
}
return tryConvertServiceError(response)
}
func callbackErrorResponseHandler(response *http.Response) error {
if response.StatusCode == 203 &&
response.Request.Header.Get(HeaderOssCallback) != "" {
return tryConvertServiceError(response)
}
return nil
}
func tryConvertServiceError(response *http.Response) (err error) {
var respBody []byte
var body []byte
timestamp, err := time.Parse(http.TimeFormat, response.Header.Get("Date"))
if err != nil {
timestamp = time.Now()
}
defer response.Body.Close()
respBody, err = io.ReadAll(response.Body)
body = respBody
if len(respBody) == 0 && len(response.Header.Get(HeaderOssERR)) > 0 {
body, err = base64.StdEncoding.DecodeString(response.Header.Get(HeaderOssERR))
if err != nil {
body = respBody
}
}
se := &ServiceError{
StatusCode: response.StatusCode,
Code: "BadErrorResponse",
RequestID: response.Header.Get(HeaderOssRequestID),
Timestamp: timestamp,
RequestTarget: fmt.Sprintf("%s %s", response.Request.Method, response.Request.URL),
Snapshot: body,
Headers: response.Header,
}
if err != nil {
se.Message = fmt.Sprintf("The body of the response was not readable, due to :%s", err.Error())
return se
}
err = xml.Unmarshal(body, &se)
if err != nil {
len := len(body)
if len > 256 {
len = 256
}
se.Message = fmt.Sprintf("Failed to parse xml from response body due to: %s. With part response body %s.", err.Error(), string(body[:len]))
return se
}
return se
}
func nonStreamResponseHandler(response *http.Response) error {
body := response.Body
if body == nil {
return nil
}
defer body.Close()
val, err := io.ReadAll(body)
if err == nil {
response.Body = io.NopCloser(bytes.NewReader(val))
}
return err
}
func checkResponseHeaderCRC64(ccrc string, header http.Header) (err error) {
if scrc := header.Get(HeaderOssCRC64); scrc != "" {
if scrc != ccrc {
return fmt.Errorf("crc is inconsistent, client %s, server %s", ccrc, scrc)
}
}
return nil
}
func applyOperationOpt(c *Options, op *Options) {
if c == nil || op == nil {
return
}
if op.Endpoint != nil {
c.Endpoint = op.Endpoint
}
if ToInt(op.RetryMaxAttempts) > 0 {
c.RetryMaxAttempts = op.RetryMaxAttempts
}
if op.Retryer != nil {
c.Retryer = op.Retryer
}
if c.Retryer == nil {
c.Retryer = retry.NopRetryer{}
}
if op.OpReadWriteTimeout != nil {
c.OpReadWriteTimeout = op.OpReadWriteTimeout
}
if op.HttpClient != nil {
c.HttpClient = op.HttpClient
}
if op.AuthMethod != nil {
c.AuthMethod = op.AuthMethod
}
//response handler
handlers := []func(*http.Response) error{
serviceErrorResponseHandler,
}
handlers = append(handlers, c.ResponseHandlers...)
handlers = append(handlers, op.ResponseHandlers...)
c.ResponseHandlers = handlers
}
func applyOperationContext(ctx context.Context, c *Options) context.Context {
if ctx == nil || c.OpReadWriteTimeout == nil {
return ctx
}
return context.WithValue(ctx, "OpReadWriteTimeout", c.OpReadWriteTimeout)
}
func applyOperationMetadata(input *OperationInput, c *Options) {
for _, h := range input.OpMetadata.Values(OpMetaKeyResponsHandler) {
if hh, ok := h.(func(*http.Response) error); ok {
c.ResponseHandlers = append(c.ResponseHandlers, hh)
}
}
}
// fieldInfo holds details for the input/output of a single field.
type fieldInfo struct {
idx int
flags int
}
const (
fRequire int = 1 << iota
fTypeUsermeta
fTypeXml
fTypeTime
)
func parseFiledFlags(tokens []string) int {
var flags int = 0
for _, token := range tokens {
switch token {
case "required":
flags |= fRequire
case "time":
flags |= fTypeTime
case "xml":
flags |= fTypeXml
case "usermeta":
flags |= fTypeUsermeta
}
}
return flags
}
func validateInput(input *OperationInput) error {
if input == nil {
return NewErrParamNull("OperationInput")
}
if input.Bucket != nil && !isValidBucketName(input.Bucket) {
return NewErrParamInvalid("OperationInput.Bucket")
}
if input.Key != nil && !isValidObjectName(input.Key) {
return NewErrParamInvalid("OperationInput.Key")
}
if !isValidMethod(input.Method) {
return NewErrParamInvalid("OperationInput.Method")
}
return nil
}
func (c *Client) marshalInput(request any, input *OperationInput, handlers ...func(any, *OperationInput) error) error {
// merge common fields
if cm, ok := request.(RequestCommonInterface); ok {
h, p, b := cm.GetCommonFileds()
// headers
if len(h) > 0 {
if input.Headers == nil {
input.Headers = map[string]string{}
}
for k, v := range h {
input.Headers[k] = v
}
}
// parameters
if len(p) > 0 {
if input.Parameters == nil {
input.Parameters = map[string]string{}
}
for k, v := range p {
input.Parameters[k] = v
}
}
// body
input.Body = b
}
val := reflect.ValueOf(request)
switch val.Kind() {
case reflect.Pointer, reflect.Interface:
if val.IsNil() {
return nil
}
val = val.Elem()
}
if val.Kind() != reflect.Struct || input == nil {
return nil
}
t := val.Type()
for k := 0; k < t.NumField(); k++ {
if tag, ok := t.Field(k).Tag.Lookup("input"); ok {
// header|query|body,filed_name,[required,time,usermeta...]
v := val.Field(k)
var flags int = 0
tokens := strings.Split(tag, ",")
if len(tokens) < 2 {
continue
}
// parse field flags
if len(tokens) > 2 {
flags = parseFiledFlags(tokens[2:])
}
// check required flag
if isEmptyValue(v) {
if flags&fRequire != 0 {
return NewErrParamRequired(t.Field(k).Name)
}
continue
}
switch tokens[0] {
case "query":
if input.Parameters == nil {
input.Parameters = map[string]string{}
}
if v.Kind() == reflect.Pointer {
v = v.Elem()
}
input.Parameters[tokens[1]] = fmt.Sprintf("%v", v.Interface())
case "header":
if input.Headers == nil {
input.Headers = map[string]string{}
}
if v.Kind() == reflect.Pointer {
v = v.Elem()
}
if flags&fTypeUsermeta != 0 {
if m, ok := v.Interface().(map[string]string); ok {
for k, v := range m {
input.Headers[tokens[1]+k] = v
}
}
} else {
input.Headers[tokens[1]] = fmt.Sprintf("%v", v.Interface())
}
case "body":
if flags&fTypeXml != 0 {
var b bytes.Buffer
if err := xml.NewEncoder(&b).EncodeElement(
v.Interface(),
xml.StartElement{Name: xml.Name{Local: tokens[1]}}); err != nil {
return &SerializationError{
Err: err,
}
}
input.Body = bytes.NewReader(b.Bytes())
} else {
if r, ok := v.Interface().(io.Reader); ok {
input.Body = r
} else {
return NewErrParamTypeNotSupport(t.Field(k).Name)
}
}
}
}
}
if err := validateInput(input); err != nil {
return err
}
for _, h := range handlers {
if err := h(request, input); err != nil {
return err
}
}
return nil
}
func marshalDeleteObjects(request any, input *OperationInput) error {
var builder strings.Builder
delRequest := request.(*DeleteMultipleObjectsRequest)
builder.WriteString("<Delete>")
builder.WriteString("<Quiet>")
builder.WriteString(strconv.FormatBool(delRequest.Quiet))
builder.WriteString("</Quiet>")
if len(delRequest.Objects) > 0 {
for _, object := range delRequest.Objects {
builder.WriteString("<Object>")
if object.Key != nil {
builder.WriteString("<Key>")
builder.WriteString(escapeXml(*object.Key))
builder.WriteString("</Key>")
}
if object.VersionId != nil {
builder.WriteString("<VersionId>")
builder.WriteString(*object.VersionId)
builder.WriteString("</VersionId>")
}
builder.WriteString("</Object>")
}
} else {
return NewErrParamInvalid("Objects")
}
builder.WriteString("</Delete>")
input.Body = strings.NewReader(builder.String())
return nil
}
func discardBody(result any, output *OperationOutput) error {
var err error
if output.Body != nil {
defer output.Body.Close()
_, err = io.Copy(io.Discard, output.Body)
}
return err
}
func unmarshalBodyXml(result any, output *OperationOutput) error {
var err error
var body []byte
if output.Body != nil {
defer output.Body.Close()
if body, err = io.ReadAll(output.Body); err != nil {
return err
}
}
if len(body) > 0 {
if err = xml.Unmarshal(body, result); err != nil {
err = &DeserializationError{
Err: err,
Snapshot: body,
}
}
}
return err
}
func unmarshalBodyXmlMix(result any, output *OperationOutput) error {
var err error
var body []byte
if output.Body != nil {
defer output.Body.Close()
if body, err = io.ReadAll(output.Body); err != nil {
return err
}
}
if len(body) == 0 {
return nil
}
val := reflect.ValueOf(result)
switch val.Kind() {
case reflect.Pointer, reflect.Interface:
if val.IsNil() {
return nil
}
val = val.Elem()
}
if val.Kind() != reflect.Struct || output == nil {
return nil
}
t := val.Type()
idx := -1
for k := 0; k < t.NumField(); k++ {
if tag, ok := t.Field(k).Tag.Lookup("output"); ok {
tokens := strings.Split(tag, ",")
if len(tokens) < 2 {
continue
}
// header|query|body,filed_name,[required,time,usermeta...]
switch tokens[0] {
case "body":
idx = k
break
}
}
}
if idx >= 0 {
dst := val.Field(idx)
if dst.IsNil() {
dst.Set(reflect.New(dst.Type().Elem()))
}
err = xml.Unmarshal(body, dst.Interface())
} else {
err = xml.Unmarshal(body, result)
}
if err != nil {
err = &DeserializationError{
Err: err,
Snapshot: body,
}
}
return err
}
func unmarshalBodyXmlVersions(result any, output *OperationOutput) error {
var err error
var body []byte
if output.Body != nil {
defer output.Body.Close()
if body, err = io.ReadAll(output.Body); err != nil {
return err
}
}
if len(body) > 0 {
oldStrings := []string{"<Version>", "</Version>", "<DeleteMarker>", "</DeleteMarker>"}
newStrings := []string{"<ObjectMix>", "</ObjectMix>", "<ObjectMix>", "</ObjectMix>"}
replacedData := string(body)
for i := range oldStrings {
replacedData = strings.Replace(replacedData, oldStrings[i], newStrings[i], -1)
}
if err = xml.Unmarshal([]byte(replacedData), result); err != nil {
err = &DeserializationError{
Err: err,
Snapshot: body,
}
}
}
return err
}
func unmarshalBodyDefault(result any, output *OperationOutput) error {
var err error
var body []byte
if output.Body != nil {
defer output.Body.Close()
if body, err = io.ReadAll(output.Body); err != nil {
return err
}
}
// extract body
if len(body) > 0 {
contentType := output.Headers.Get("Content-Type")
switch contentType {
case "application/xml":
err = xml.Unmarshal(body, result)
case "application/json":
err = json.Unmarshal(body, result)
case "application/json;charset=utf-8":
err = json.Unmarshal(body, result)
default:
err = fmt.Errorf("unsupport contentType:%s", contentType)
}
if err != nil {
err = &DeserializationError{
Err: err,
Snapshot: body,
}
}
}
return err
}
func unmarshalCallbackBody(result any, output *OperationOutput) error {
var err error
var body []byte
if output.Body != nil {
defer output.Body.Close()
if body, err = io.ReadAll(output.Body); err != nil {
return err
}
}
if len(body) > 0 {
switch r := result.(type) {
case *PutObjectResult:
if err = json.Unmarshal(body, &r.CallbackResult); err != nil {
return err
}
case *CompleteMultipartUploadResult:
if err = json.Unmarshal(body, &r.CallbackResult); err != nil {
return err
}
}
}
return err
}
func unmarshalHeader(result any, output *OperationOutput) error {
val := reflect.ValueOf(result)
switch val.Kind() {
case reflect.Pointer, reflect.Interface:
if val.IsNil() {
return nil
}
val = val.Elem()
}
if val.Kind() != reflect.Struct || output == nil {
return nil
}
filedInfos := map[string]fieldInfo{}
t := val.Type()
var usermetaKeys []string
for k := 0; k < t.NumField(); k++ {
if tag, ok := t.Field(k).Tag.Lookup("output"); ok {
tokens := strings.Split(tag, ",")
if len(tokens) < 2 {
continue
}
// header|query|body,filed_name,[required,time,usermeta...]
switch tokens[0] {
case "header":
lowkey := strings.ToLower(tokens[1])
var flags int = 0
if len(tokens) >= 3 {
flags = parseFiledFlags(tokens[2:])
}
filedInfos[lowkey] = fieldInfo{idx: k, flags: flags}
if flags&fTypeUsermeta != 0 {
usermetaKeys = append(usermetaKeys, lowkey)
}
}
}
}
var err error
for key, vv := range output.Headers {
lkey := strings.ToLower(key)
for _, prefix := range usermetaKeys {
if strings.HasPrefix(lkey, prefix) {
if field, ok := filedInfos[prefix]; ok {
if field.flags&fTypeUsermeta != 0 {
mapKey := strings.TrimPrefix(lkey, prefix)
err = setMapStringReflectValue(val.Field(field.idx), mapKey, vv[0])
}
}
}
}
if field, ok := filedInfos[lkey]; ok {
if field.flags&fTypeTime != 0 {
if t, err := http.ParseTime(vv[0]); err == nil {
err = setTimeReflectValue(val.Field(field.idx), t)
}
} else {
err = setReflectValue(val.Field(field.idx), vv[0])
}
if err != nil {
return err
}
}
}
return nil
}
func unmarshalHeaderLite(result any, output *OperationOutput) error {
val := reflect.ValueOf(result)
switch val.Kind() {
case reflect.Pointer, reflect.Interface:
if val.IsNil() {
return nil
}
val = val.Elem()
}
if val.Kind() != reflect.Struct || output == nil {
return nil
}
t := val.Type()
for k := 0; k < t.NumField(); k++ {
if tag := t.Field(k).Tag.Get("output"); tag != "" {
tokens := strings.Split(tag, ",")
if len(tokens) != 2 {
continue
}
switch tokens[0] {
case "header":
if src := output.Headers.Get(tokens[1]); src != "" {
if err := setReflectValue(val.Field(k), src); err != nil {
return err
}
}
}
}
}
return nil
}
func (c *Client) unmarshalOutput(result any, output *OperationOutput, handlers ...func(any, *OperationOutput) error) error {
// Common
if cm, ok := result.(ResultCommonInterface); ok {
cm.CopyIn(output.Status, output.StatusCode, output.Headers, output.OpMetadata)
}
var err error
for _, h := range handlers {
if err = h(result, output); err != nil {
break
}
}
return err
}
func updateContentMd5(_ any, input *OperationInput) error {
var err error
var contentMd5 string
if input.Body != nil {
var r io.ReadSeeker
var ok bool
if r, ok = input.Body.(io.ReadSeeker); !ok {
buf, _ := io.ReadAll(input.Body)
r = bytes.NewReader(buf)
input.Body = r
}
h := md5.New()
if _, err = copySeekableBody(h, r); err != nil {
// error
} else {
contentMd5 = base64.StdEncoding.EncodeToString(h.Sum(nil))
}
} else {
contentMd5 = "1B2M2Y8AsgTpgAmY7PhCfg=="
}
// set content-md5 and content-type
if err == nil {
if input.Headers == nil {
input.Headers = map[string]string{}
}
input.Headers["Content-MD5"] = contentMd5
}
return err
}
func updateContentType(_ any, input *OperationInput) error {
if input.Headers == nil {
input.Headers = map[string]string{}
}
if _, ok := input.Headers[HTTPHeaderContentType]; !ok {
value := TypeByExtension(ToString(input.Key))
if value == "" {
value = contentTypeDefault
}
input.Headers[HTTPHeaderContentType] = value
}
return nil
}
func addProgress(request any, input *OperationInput) error {
var w io.Writer
switch req := request.(type) {
case *PutObjectRequest:
if req.ProgressFn == nil {
return nil
}
w = NewProgress(req.ProgressFn, GetReaderLen(input.Body))
case *AppendObjectRequest:
if req.ProgressFn == nil {
return nil
}
w = NewProgress(req.ProgressFn, GetReaderLen(input.Body))
case *UploadPartRequest:
if req.ProgressFn == nil {
return nil
}
w = NewProgress(req.ProgressFn, GetReaderLen(input.Body))
default:
return nil
}
input.OpMetadata.Add(OpMetaKeyRequestBodyTracker, w)
return nil
}
func addProcess(request any, input *OperationInput) error {
switch req := request.(type) {
case *ProcessObjectRequest:
if req.Process == nil {
return nil
}
processData := fmt.Sprintf("%v=%v", "x-oss-process", ToString(req.Process))
input.Body = strings.NewReader(processData)
case *AsyncProcessObjectRequest:
if req.AsyncProcess == nil {
return nil
}
processData := fmt.Sprintf("%v=%v", "x-oss-async-process", ToString(req.AsyncProcess))
input.Body = strings.NewReader(processData)
default:
return nil
}
return nil
}
func addCrcCheck(_ any, input *OperationInput) error {
var w io.Writer = NewCRC64(0)
input.OpMetadata.Add(OpMetaKeyRequestBodyTracker, w)
input.OpMetadata.Add(OpMetaKeyResponsHandler, func(response *http.Response) error {
return checkResponseHeaderCRC64(fmt.Sprint(w.(hash.Hash64).Sum64()), response.Header)
})
return nil
}
func addCallback(_ any, input *OperationInput) error {
input.OpMetadata.Add(OpMetaKeyResponsHandler, callbackErrorResponseHandler)
return nil
}
func enableNonStream(_ any, input *OperationInput) error {
input.OpMetadata.Add(OpMetaKeyResponsHandler, func(response *http.Response) error {
return nonStreamResponseHandler(response)
})
return nil
}
func (c *Client) updateContentType(request any, input *OperationInput) error {
if !c.hasFeature(FeatureAutoDetectMimeType) {
return nil
}
return updateContentType(request, input)
}
func (c *Client) addCrcCheck(request any, input *OperationInput) error {
if !c.hasFeature(FeatureEnableCRC64CheckUpload) {
return nil
}
return addCrcCheck(request, input)
}
func encodeSourceObject(request any) string {
var bucket, key, versionId string
switch req := request.(type) {
case *CopyObjectRequest:
key = ToString(req.SourceKey)
if req.SourceBucket != nil {
bucket = *req.SourceBucket
} else {
bucket = ToString(req.Bucket)
}
versionId = ToString(req.SourceVersionId)
case *UploadPartCopyRequest:
key = ToString(req.SourceKey)
if req.SourceBucket != nil {
bucket = *req.SourceBucket
} else {
bucket = ToString(req.Bucket)
}
versionId = ToString(req.SourceVersionId)
}
source := fmt.Sprintf("/%s/%s", bucket, escapePath(key, false))
if versionId != "" {
source += "?versionId=" + versionId
}
return source
}
func (c *Client) toClientError(err error, code string, output *OperationOutput) error {
if err == nil {
return nil
}
return &ClientError{
Code: code,
Message: fmt.Sprintf("execute %s fail, error code is %s, request id:%s",
output.Input.OpName,
code,
output.Headers.Get(HeaderOssRequestID),
),
Err: err}
}
func (c *Client) hasFeature(flag FeatureFlagsType) bool {
return (c.options.FeatureFlags & flag) > 0
}
func (c *Client) retryMaxAttempts(opts *Options) int {
if opts == nil {
opts = &c.options
}
if opts.RetryMaxAttempts != nil {
return ToInt(opts.RetryMaxAttempts)
}
if opts.Retryer != nil {
return opts.Retryer.MaxAttempts()
}
return retry.DefaultMaxAttempts
}
func (c *Client) dumpOperationOutput(output *OperationOutput) string {
if output == nil {
return ""
}
return fmt.Sprintf("http.Request[%p] Status:%v, StatusCode%v, RequestId:%v",
output.httpRequest, output.Status, output.StatusCode,
output.Headers.Get(HeaderOssRequestID),
)
}
// LoggerHTTPReq Print the header information of the http request
func (c *Client) logHttpPRequet(request *http.Request) {
if c.getLogLevel() < LogDebug {
return
}
var logBuffer bytes.Buffer
logBuffer.WriteString(fmt.Sprintf("http.request[%p]", request))
if request != nil {
logBuffer.WriteString(fmt.Sprintf("Method:%s\t", request.Method))
logBuffer.WriteString(fmt.Sprintf("Host:%s\t", request.URL.Host))
logBuffer.WriteString(fmt.Sprintf("Path:%s\t", request.URL.Path))
logBuffer.WriteString(fmt.Sprintf("Query:%s\t", request.URL.RawQuery))
logBuffer.WriteString(fmt.Sprintf("Header info:"))
for k, v := range request.Header {
var valueBuffer bytes.Buffer
for j := 0; j < len(v); j++ {
if j > 0 {
valueBuffer.WriteString(" ")
}
valueBuffer.WriteString(v[j])
}
logBuffer.WriteString(fmt.Sprintf("\t%s:%s", k, valueBuffer.String()))
}
}
c.inner.Log.Debugf("%s", logBuffer.String())
}
// LoggerHTTPResp Print Response to http request
func (c *Client) logHttpResponse(request *http.Request, response *http.Response) {
if c.getLogLevel() < LogDebug {
return
}
var logBuffer bytes.Buffer
logBuffer.WriteString(fmt.Sprintf("http.request[%p]|http.response[%p]", request, response))
if response != nil {
logBuffer.WriteString(fmt.Sprintf("StatusCode:%d\t", response.StatusCode))
logBuffer.WriteString(fmt.Sprintf("Header info:"))
for k, v := range response.Header {
var valueBuffer bytes.Buffer
for j := 0; j < len(v); j++ {
if j > 0 {
valueBuffer.WriteString(" ")
}
valueBuffer.WriteString(v[j])
}
logBuffer.WriteString(fmt.Sprintf("\t%s:%s", k, valueBuffer.String()))
}
}
c.inner.Log.Debugf("%s", logBuffer.String())
}
func (c *Client) getLogLevel() int {
if c.inner.Log != nil {
return c.inner.Log.Level()
}
return LogOff
}
// Content-Type
const (
contentTypeDefault string = "application/octet-stream"
contentTypeXML = "application/xml"
)