in oss/copier.go [326:478]
func (d *copierDelegate) multiCopy() (*CopyResult, error) {
var (
wg sync.WaitGroup
mu sync.Mutex
parts UploadParts
errValue atomic.Value
)
// Init the multipart
imRequest, err := d.newInitiateMultipartUpload()
if err != nil {
return nil, d.wrapErr("", err)
}
initResult, err := d.base.client.InitiateMultipartUpload(d.context, imRequest, d.options.ClientOptions...)
if err != nil {
return nil, d.wrapErr("", err)
}
saveErrFn := func(e error) {
errValue.Store(e)
}
getErrFn := func() error {
v := errValue.Load()
if v == nil {
return nil
}
e, _ := v.(error)
return e
}
// readChunk runs in worker goroutines to pull chunks off of the ch channel
// timeout for MultiPartCopy API
// 10s per 200M, max timeout is 50s
const PART_SIZE int64 = 200 * 1024 * 1024
const STEP time.Duration = 10 * time.Second
mpcTimeout := transport.DefaultReadWriteTimeout
partSize := d.options.PartSize
for partSize > PART_SIZE {
mpcTimeout += STEP
partSize -= PART_SIZE
if mpcTimeout > 50*time.Second {
break
}
}
mpcClientOptions := append(d.options.ClientOptions, OpReadWriteTimeout(mpcTimeout))
readChunkFn := func(ch chan copyChunk) {
defer wg.Done()
for {
data, ok := <-ch
if !ok {
break
}
if getErrFn() == nil {
upResult, err := d.base.client.UploadPartCopy(
d.context,
&UploadPartCopyRequest{
Bucket: d.request.Bucket,
Key: d.request.Key,
SourceBucket: d.request.SourceBucket,
SourceKey: d.request.SourceKey,
SourceVersionId: d.request.SourceVersionId,
UploadId: initResult.UploadId,
PartNumber: data.partNum,
Range: Ptr(data.sourceRange),
RequestPayer: d.request.RequestPayer,
}, mpcClientOptions...)
//fmt.Printf("UploadPart result: %#v, %#v\n", upResult, err)
if err == nil {
mu.Lock()
parts = append(parts, UploadPart{ETag: upResult.ETag, PartNumber: data.partNum})
d.transferred += data.size
d.progressCallback(data.size)
mu.Unlock()
} else {
saveErrFn(err)
}
}
}
}
ch := make(chan copyChunk, d.options.ParallelNum)
for i := 0; i < d.options.ParallelNum; i++ {
wg.Add(1)
go readChunkFn(ch)
}
// Read and queue the parts
var (
qnum int32 = 0
totalSize int64 = d.sizeInBytes
readerPos int64 = 0
)
for getErrFn() == nil && readerPos < totalSize {
n := d.options.PartSize
bytesLeft := totalSize - readerPos
if bytesLeft <= d.options.PartSize {
n = bytesLeft
}
//fmt.Printf("send chunk: %d\n", qnum)
qnum++
ch <- copyChunk{partNum: qnum, size: n, sourceRange: fmt.Sprintf("bytes=%v-%v", readerPos, (readerPos + n - 1))}
readerPos += n
}
// Close the channel, wait for workers
close(ch)
wg.Wait()
// Complete upload
var cmResult *CompleteMultipartUploadResult
if err = getErrFn(); err == nil {
sort.Sort(parts)
cmRequest := &CompleteMultipartUploadRequest{}
copyRequest(cmRequest, d.request)
cmRequest.UploadId = initResult.UploadId
cmRequest.CompleteMultipartUpload = &CompleteMultipartUpload{Parts: parts}
cmResult, err = d.base.client.CompleteMultipartUpload(d.context, cmRequest, d.options.ClientOptions...)
}
//fmt.Printf("CompleteMultipartUpload cmResult: %#v, %#v\n", cmResult, err)
if err != nil {
//Abort
if !d.options.LeavePartsOnError {
amRequest := &AbortMultipartUploadRequest{}
copyRequest(amRequest, d.request)
amRequest.UploadId = initResult.UploadId
_, _ = d.base.client.AbortMultipartUpload(d.context, amRequest, d.options.ClientOptions...)
}
return nil, d.wrapErr(*initResult.UploadId, err)
}
// check crc
if cmResult.HashCRC64 != nil {
srcCrc := d.metaProp.Headers.Get(HeaderOssCRC64)
if srcCrc != "" {
destCrc := ToString(cmResult.HashCRC64)
if destCrc != srcCrc {
return nil, d.wrapErr(*initResult.UploadId, fmt.Errorf("crc is inconsistent, source %s, destination %s", srcCrc, destCrc))
}
}
}
return &CopyResult{
UploadId: initResult.UploadId,
ETag: cmResult.ETag,
VersionId: cmResult.VersionId,
HashCRC64: cmResult.HashCRC64,
ResultCommon: cmResult.ResultCommon,
}, nil
}