oss/copier.go (458 lines of code) (raw):

package oss import ( "context" "fmt" "sort" "strings" "sync" "sync/atomic" "time" "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/transport" ) var metadataCopied = map[string]struct{}{ "content-type": {}, "content-language": {}, "content-encoding": {}, "content-disposition": {}, "cache-control": {}, "expires": {}, } type CopierOptions struct { PartSize int64 ParallelNum int MultipartCopyThreshold int64 LeavePartsOnError bool DisableShallowCopy bool ClientOptions []func(*Options) // MetaProperties and TagProperties takes effect in Copier.Copy MetadataProperties *HeadObjectResult TagProperties *GetObjectTaggingResult } type Copier struct { options CopierOptions client CopyAPIClient featureFlags FeatureFlagsType } // NewCopier creates a new Copier instance to copy objects. // Pass In additional functional options to customize the copier's behavior. func NewCopier(api CopyAPIClient, optFns ...func(*CopierOptions)) *Copier { options := CopierOptions{ PartSize: DefaultCopyPartSize, ParallelNum: DefaultCopyParallel, MultipartCopyThreshold: DefaultCopyThreshold, LeavePartsOnError: false, DisableShallowCopy: false, } for _, fn := range optFns { fn(&options) } options.TagProperties = nil options.MetadataProperties = nil c := &Copier{ client: api, options: options, } //Get Client Feature switch t := api.(type) { case *Client: c.featureFlags = t.options.FeatureFlags } return c } type CopyResult struct { UploadId *string ETag *string VersionId *string HashCRC64 *string ResultCommon } type CopyError struct { Err error UploadId string Path string } func (m *CopyError) Error() string { var extra string if m.Err != nil { extra = fmt.Sprintf(", cause: %s", m.Err.Error()) } return fmt.Sprintf("copy failed, upload id: %s%s", m.UploadId, extra) } func (m *CopyError) Unwrap() error { return m.Err } func (c *Copier) Copy(ctx context.Context, request *CopyObjectRequest, optFns ...func(*CopierOptions)) (*CopyResult, error) { // Copier wrapper delegate, err := c.newDelegate(ctx, request, optFns...) if err != nil { return nil, err } if err = delegate.checkSource(); err != nil { return nil, err } if err = delegate.applySource(); err != nil { return nil, err } return delegate.copy() } type copierDelegate struct { base *Copier options CopierOptions context context.Context request *CopyObjectRequest // Source's Info metaProp *HeadObjectResult tagProp *GetObjectTaggingResult sizeInBytes int64 transferred int64 } func (c *Copier) newDelegate(ctx context.Context, request *CopyObjectRequest, optFns ...func(*CopierOptions)) (*copierDelegate, error) { if request == nil { return nil, NewErrParamNull("request") } if request.Bucket == nil { return nil, NewErrParamNull("request.Bucket") } if request.Key == nil { return nil, NewErrParamNull("request.Key") } if request.SourceKey == nil { return nil, NewErrParamNull("request.SourceKey") } if request.MetadataDirective != nil && !isValidCopyDirective(*request.MetadataDirective) { return nil, NewErrParamInvalid("request.MetadataDirective") } if request.TaggingDirective != nil && !isValidCopyDirective(*request.TaggingDirective) { return nil, NewErrParamInvalid("request.TaggingDirective") } d := copierDelegate{ base: c, options: c.options, context: ctx, request: request, } for _, opt := range optFns { opt(&d.options) } if d.options.ParallelNum <= 0 { d.options.ParallelNum = DefaultCopyParallel } if d.options.PartSize <= 0 { d.options.PartSize = DefaultCopyPartSize } if d.options.MultipartCopyThreshold < 0 { d.options.MultipartCopyThreshold = DefaultCopyThreshold } d.tagProp = d.options.TagProperties d.metaProp = d.options.MetadataProperties return &d, nil } func (d *copierDelegate) checkSource() error { if d.metaProp != nil { return nil } var request HeadObjectRequest copyRequest(&request, d.request) if d.request.SourceBucket != nil { request.Bucket = d.request.SourceBucket } request.Key = d.request.SourceKey request.VersionId = d.request.SourceVersionId result, err := d.base.client.HeadObject(d.context, &request, d.options.ClientOptions...) if err != nil { return err } d.metaProp = result return nil } func (d *copierDelegate) applySource() error { d.sizeInBytes = d.metaProp.ContentLength // signle copy mode if d.sizeInBytes <= d.options.MultipartCopyThreshold { return nil } // multi part copy mode //Part Size partSize := d.options.PartSize if d.sizeInBytes > 0 { for d.sizeInBytes/partSize >= int64(MaxUploadParts) { partSize += d.options.PartSize } } d.options.PartSize = partSize return nil } func (d *copierDelegate) canUseShallowCopy() bool { if d.options.DisableShallowCopy { return false } // Change StorageClass if d.request.StorageClass != "" { return false } // Cross bucket if d.request.SourceBucket != nil && ToString(d.request.SourceBucket) != ToString(d.request.Bucket) { return false } // Decryption if d.metaProp.Headers.Get(HeaderOssServerSideEncryption) != "" { return false } return true } func (d *copierDelegate) copy() (*CopyResult, error) { if d.sizeInBytes <= d.options.MultipartCopyThreshold { return d.singleCopy() } else if d.canUseShallowCopy() { return d.shallowCopy() } return d.multiCopy() } func (d *copierDelegate) singleCopy() (*CopyResult, error) { result, err := d.base.client.CopyObject(d.context, d.request, d.options.ClientOptions...) if err != nil { return nil, d.wrapErr("", err) } // update d.transferred = d.sizeInBytes d.progressCallback(d.sizeInBytes) return &CopyResult{ ETag: result.ETag, HashCRC64: result.HashCRC64, VersionId: result.VersionId, ResultCommon: result.ResultCommon, }, nil } func (d *copierDelegate) shallowCopy() (*CopyResult, error) { // use signle copy first, if meets timeout, use multiCopy ctx, cancel := context.WithTimeout(d.context, 30*time.Second) defer cancel() result, err := d.base.client.CopyObject(ctx, d.request, d.options.ClientOptions...) if err != nil { if isContextError(ctx, &err) { return d.multiCopy() } return nil, d.wrapErr("", err) } // update d.transferred = d.sizeInBytes d.progressCallback(d.sizeInBytes) return &CopyResult{ ETag: result.ETag, HashCRC64: result.HashCRC64, VersionId: result.VersionId, ResultCommon: result.ResultCommon, }, nil } type copyChunk struct { partNum int32 size int64 sourceRange string } func (d *copierDelegate) multiCopy() (*CopyResult, error) { var ( wg sync.WaitGroup mu sync.Mutex parts UploadParts errValue atomic.Value ) // Init the multipart imRequest, err := d.newInitiateMultipartUpload() if err != nil { return nil, d.wrapErr("", err) } initResult, err := d.base.client.InitiateMultipartUpload(d.context, imRequest, d.options.ClientOptions...) if err != nil { return nil, d.wrapErr("", err) } saveErrFn := func(e error) { errValue.Store(e) } getErrFn := func() error { v := errValue.Load() if v == nil { return nil } e, _ := v.(error) return e } // readChunk runs in worker goroutines to pull chunks off of the ch channel // timeout for MultiPartCopy API // 10s per 200M, max timeout is 50s const PART_SIZE int64 = 200 * 1024 * 1024 const STEP time.Duration = 10 * time.Second mpcTimeout := transport.DefaultReadWriteTimeout partSize := d.options.PartSize for partSize > PART_SIZE { mpcTimeout += STEP partSize -= PART_SIZE if mpcTimeout > 50*time.Second { break } } mpcClientOptions := append(d.options.ClientOptions, OpReadWriteTimeout(mpcTimeout)) readChunkFn := func(ch chan copyChunk) { defer wg.Done() for { data, ok := <-ch if !ok { break } if getErrFn() == nil { upResult, err := d.base.client.UploadPartCopy( d.context, &UploadPartCopyRequest{ Bucket: d.request.Bucket, Key: d.request.Key, SourceBucket: d.request.SourceBucket, SourceKey: d.request.SourceKey, SourceVersionId: d.request.SourceVersionId, UploadId: initResult.UploadId, PartNumber: data.partNum, Range: Ptr(data.sourceRange), RequestPayer: d.request.RequestPayer, }, mpcClientOptions...) //fmt.Printf("UploadPart result: %#v, %#v\n", upResult, err) if err == nil { mu.Lock() parts = append(parts, UploadPart{ETag: upResult.ETag, PartNumber: data.partNum}) d.transferred += data.size d.progressCallback(data.size) mu.Unlock() } else { saveErrFn(err) } } } } ch := make(chan copyChunk, d.options.ParallelNum) for i := 0; i < d.options.ParallelNum; i++ { wg.Add(1) go readChunkFn(ch) } // Read and queue the parts var ( qnum int32 = 0 totalSize int64 = d.sizeInBytes readerPos int64 = 0 ) for getErrFn() == nil && readerPos < totalSize { n := d.options.PartSize bytesLeft := totalSize - readerPos if bytesLeft <= d.options.PartSize { n = bytesLeft } //fmt.Printf("send chunk: %d\n", qnum) qnum++ ch <- copyChunk{partNum: qnum, size: n, sourceRange: fmt.Sprintf("bytes=%v-%v", readerPos, (readerPos + n - 1))} readerPos += n } // Close the channel, wait for workers close(ch) wg.Wait() // Complete upload var cmResult *CompleteMultipartUploadResult if err = getErrFn(); err == nil { sort.Sort(parts) cmRequest := &CompleteMultipartUploadRequest{} copyRequest(cmRequest, d.request) cmRequest.UploadId = initResult.UploadId cmRequest.CompleteMultipartUpload = &CompleteMultipartUpload{Parts: parts} cmResult, err = d.base.client.CompleteMultipartUpload(d.context, cmRequest, d.options.ClientOptions...) } //fmt.Printf("CompleteMultipartUpload cmResult: %#v, %#v\n", cmResult, err) if err != nil { //Abort if !d.options.LeavePartsOnError { amRequest := &AbortMultipartUploadRequest{} copyRequest(amRequest, d.request) amRequest.UploadId = initResult.UploadId _, _ = d.base.client.AbortMultipartUpload(d.context, amRequest, d.options.ClientOptions...) } return nil, d.wrapErr(*initResult.UploadId, err) } // check crc if cmResult.HashCRC64 != nil { srcCrc := d.metaProp.Headers.Get(HeaderOssCRC64) if srcCrc != "" { destCrc := ToString(cmResult.HashCRC64) if destCrc != srcCrc { return nil, d.wrapErr(*initResult.UploadId, fmt.Errorf("crc is inconsistent, source %s, destination %s", srcCrc, destCrc)) } } } return &CopyResult{ UploadId: initResult.UploadId, ETag: cmResult.ETag, VersionId: cmResult.VersionId, HashCRC64: cmResult.HashCRC64, ResultCommon: cmResult.ResultCommon, }, nil } func (d *copierDelegate) newInitiateMultipartUpload() (*InitiateMultipartUploadRequest, error) { var err error imRequest := &InitiateMultipartUploadRequest{} copyRequest(imRequest, d.request) imRequest.DisableAutoDetectMimeType = true if err = d.overwirteMetadataProp(imRequest); err != nil { return nil, err } if err = d.overwirteTagProp(imRequest); err != nil { return nil, err } return imRequest, nil } func (d *copierDelegate) overwirteMetadataProp(imRequest *InitiateMultipartUploadRequest) error { copyRequest := d.request switch strings.ToLower(ToString(copyRequest.MetadataDirective)) { case "", "copy": if d.metaProp == nil { return fmt.Errorf("request.MetadataDirective is COPY, but meets nil metaProp for source") } imRequest.CacheControl = nil imRequest.ContentType = nil imRequest.ContentDisposition = nil imRequest.ContentEncoding = nil imRequest.Expires = nil imRequest.Metadata = nil imRequest.Headers = map[string]string{} // skip meta in Headers for k, v := range d.request.Headers { lowK := strings.ToLower(k) if strings.HasPrefix(lowK, "x-oss-meta") { //skip } else if _, ok := metadataCopied[lowK]; ok { //skip } else { imRequest.Headers[k] = v } } // copy meta form source for k, v := range d.metaProp.Headers { lowK := strings.ToLower(k) if strings.HasPrefix(lowK, "x-oss-meta") { imRequest.Headers[lowK] = v[0] } else if _, ok := metadataCopied[lowK]; ok { imRequest.Headers[lowK] = v[0] } } case "replace": // the metedata has been copied via the copyRequest function before default: return fmt.Errorf("Unsupport MetadataDirective, %s", ToString(d.request.MetadataDirective)) } return nil } func (d *copierDelegate) overwirteTagProp(imRequest *InitiateMultipartUploadRequest) error { switch strings.ToLower(ToString(d.request.TaggingDirective)) { case "", "copy": imRequest.Tagging = nil if d.metaProp.TaggingCount > 0 && d.tagProp == nil { request := &GetObjectTaggingRequest{} copyRequest(request, d.request) if d.request.SourceBucket != nil { request.Bucket = d.request.SourceBucket } request.Key = d.request.SourceKey request.VersionId = d.request.SourceVersionId result, err := d.base.client.GetObjectTagging(d.context, request, d.options.ClientOptions...) if err != nil { return err } d.tagProp = result } if d.tagProp != nil { var tags []string for _, t := range d.tagProp.Tags { tags = append(tags, fmt.Sprintf("%v=%v", ToString(t.Key), ToString(t.Value))) } if len(tags) > 0 { imRequest.Tagging = Ptr(strings.Join(tags, "&")) } } case "replace": // the tag has been copied via the copyRequest function before default: return fmt.Errorf("Unsupport TaggingDirective, %s", ToString(d.request.TaggingDirective)) } return nil } func (d *copierDelegate) wrapErr(uploadId string, err error) error { return &CopyError{ UploadId: uploadId, Path: fmt.Sprintf("oss://%s/%s", *d.request.Bucket, *d.request.Key), Err: err} } func (d *copierDelegate) progressCallback(increment int64) { if d.request.ProgressFn != nil { d.request.ProgressFn(increment, d.transferred, d.sizeInBytes) } }