func()

in oss/copier.go [326:478]


func (d *copierDelegate) multiCopy() (*CopyResult, error) {
	var (
		wg       sync.WaitGroup
		mu       sync.Mutex
		parts    UploadParts
		errValue atomic.Value
	)

	// Init the multipart
	imRequest, err := d.newInitiateMultipartUpload()
	if err != nil {
		return nil, d.wrapErr("", err)
	}

	initResult, err := d.base.client.InitiateMultipartUpload(d.context, imRequest, d.options.ClientOptions...)
	if err != nil {
		return nil, d.wrapErr("", err)
	}

	saveErrFn := func(e error) {
		errValue.Store(e)
	}

	getErrFn := func() error {
		v := errValue.Load()
		if v == nil {
			return nil
		}
		e, _ := v.(error)
		return e
	}

	// readChunk runs in worker goroutines to pull chunks off of the ch channel
	// timeout for MultiPartCopy API
	// 10s per 200M, max timeout is 50s
	const PART_SIZE int64 = 200 * 1024 * 1024
	const STEP time.Duration = 10 * time.Second
	mpcTimeout := transport.DefaultReadWriteTimeout
	partSize := d.options.PartSize
	for partSize > PART_SIZE {
		mpcTimeout += STEP
		partSize -= PART_SIZE
		if mpcTimeout > 50*time.Second {
			break
		}
	}
	mpcClientOptions := append(d.options.ClientOptions, OpReadWriteTimeout(mpcTimeout))

	readChunkFn := func(ch chan copyChunk) {
		defer wg.Done()
		for {
			data, ok := <-ch
			if !ok {
				break
			}
			if getErrFn() == nil {
				upResult, err := d.base.client.UploadPartCopy(
					d.context,
					&UploadPartCopyRequest{
						Bucket:          d.request.Bucket,
						Key:             d.request.Key,
						SourceBucket:    d.request.SourceBucket,
						SourceKey:       d.request.SourceKey,
						SourceVersionId: d.request.SourceVersionId,
						UploadId:        initResult.UploadId,
						PartNumber:      data.partNum,
						Range:           Ptr(data.sourceRange),
						RequestPayer:    d.request.RequestPayer,
					}, mpcClientOptions...)
				//fmt.Printf("UploadPart result: %#v, %#v\n", upResult, err)
				if err == nil {
					mu.Lock()
					parts = append(parts, UploadPart{ETag: upResult.ETag, PartNumber: data.partNum})
					d.transferred += data.size
					d.progressCallback(data.size)
					mu.Unlock()
				} else {
					saveErrFn(err)
				}
			}
		}
	}

	ch := make(chan copyChunk, d.options.ParallelNum)
	for i := 0; i < d.options.ParallelNum; i++ {
		wg.Add(1)
		go readChunkFn(ch)
	}

	// Read and queue the parts
	var (
		qnum      int32 = 0
		totalSize int64 = d.sizeInBytes
		readerPos int64 = 0
	)
	for getErrFn() == nil && readerPos < totalSize {
		n := d.options.PartSize
		bytesLeft := totalSize - readerPos
		if bytesLeft <= d.options.PartSize {
			n = bytesLeft
		}
		//fmt.Printf("send chunk: %d\n", qnum)
		qnum++
		ch <- copyChunk{partNum: qnum, size: n, sourceRange: fmt.Sprintf("bytes=%v-%v", readerPos, (readerPos + n - 1))}
		readerPos += n
	}

	// Close the channel, wait for workers
	close(ch)
	wg.Wait()

	// Complete upload
	var cmResult *CompleteMultipartUploadResult
	if err = getErrFn(); err == nil {
		sort.Sort(parts)
		cmRequest := &CompleteMultipartUploadRequest{}
		copyRequest(cmRequest, d.request)
		cmRequest.UploadId = initResult.UploadId
		cmRequest.CompleteMultipartUpload = &CompleteMultipartUpload{Parts: parts}
		cmResult, err = d.base.client.CompleteMultipartUpload(d.context, cmRequest, d.options.ClientOptions...)
	}
	//fmt.Printf("CompleteMultipartUpload cmResult: %#v, %#v\n", cmResult, err)

	if err != nil {
		//Abort
		if !d.options.LeavePartsOnError {
			amRequest := &AbortMultipartUploadRequest{}
			copyRequest(amRequest, d.request)
			amRequest.UploadId = initResult.UploadId
			_, _ = d.base.client.AbortMultipartUpload(d.context, amRequest, d.options.ClientOptions...)
		}
		return nil, d.wrapErr(*initResult.UploadId, err)
	}

	// check crc
	if cmResult.HashCRC64 != nil {
		srcCrc := d.metaProp.Headers.Get(HeaderOssCRC64)
		if srcCrc != "" {
			destCrc := ToString(cmResult.HashCRC64)
			if destCrc != srcCrc {
				return nil, d.wrapErr(*initResult.UploadId, fmt.Errorf("crc is inconsistent, source %s, destination %s", srcCrc, destCrc))
			}
		}
	}

	return &CopyResult{
		UploadId:     initResult.UploadId,
		ETag:         cmResult.ETag,
		VersionId:    cmResult.VersionId,
		HashCRC64:    cmResult.HashCRC64,
		ResultCommon: cmResult.ResultCommon,
	}, nil
}