in oss/uploader.go [508:681]
func (u *uploaderDelegate) multiPart() (*UploadResult, error) {
release := func() {
if u.partPool != nil {
u.partPool.Close()
}
}
defer release()
var (
wg sync.WaitGroup
mu sync.Mutex
parts UploadParts
errValue atomic.Value
crcParts uploadPartCRCs
enableCRC = (u.base.featureFlags & FeatureEnableCRC64CheckUpload) > 0
)
// Init the multipart
uploadIdInfo, err := u.getUploadId()
if err != nil {
return nil, u.wrapErr("", err)
}
//fmt.Printf("getUploadId result: %v, %#v\n", uploadId, err)
uploadId := uploadIdInfo.uploadId
startPartNum := uploadIdInfo.startNum
// Update Checkpoint
if u.checkpoint != nil {
u.checkpoint.Info.Data.UploadInfo.UploadId = uploadId
u.checkpoint.dump()
}
saveErrFn := func(e error) {
errValue.Store(e)
}
getErrFn := func() error {
v := errValue.Load()
if v == nil {
return nil
}
e, _ := v.(error)
return e
}
// readChunk runs in worker goroutines to pull chunks off of the ch channel
readChunkFn := func(ch chan uploaderChunk) {
defer wg.Done()
for {
data, ok := <-ch
if !ok {
break
}
if getErrFn() == nil {
upResult, err := u.client.UploadPart(
u.context,
&UploadPartRequest{
Bucket: u.request.Bucket,
Key: u.request.Key,
UploadId: Ptr(uploadId),
PartNumber: data.partNum,
Body: data.body,
CSEMultiPartContext: uploadIdInfo.cseContext,
RequestPayer: u.request.RequestPayer,
},
u.options.ClientOptions...)
//fmt.Printf("UploadPart result: %#v, %#v\n", upResult, err)
if err == nil {
mu.Lock()
parts = append(parts, UploadPart{ETag: upResult.ETag, PartNumber: data.partNum})
if enableCRC {
crcParts = append(crcParts,
uploadPartCRC{partNumber: data.partNum, hashCRC64: upResult.HashCRC64, size: data.size})
}
if u.request.ProgressFn != nil {
u.transferred += int64(data.size)
u.request.ProgressFn(int64(data.size), u.transferred, u.totalSize)
}
mu.Unlock()
} else {
saveErrFn(err)
}
}
data.cleanup()
}
}
ch := make(chan uploaderChunk, u.options.ParallelNum)
for i := 0; i < u.options.ParallelNum; i++ {
wg.Add(1)
go readChunkFn(ch)
}
// Read and queue the parts
var (
qnum int32 = startPartNum
qerr error = nil
)
// consume uploaded parts
if u.readerPos > 0 {
for _, p := range u.uploadedParts {
parts = append(parts, UploadPart{PartNumber: p.PartNumber, ETag: p.ETag})
}
if u.request.ProgressFn != nil {
u.transferred = u.readerPos
u.request.ProgressFn(u.readerPos, u.transferred, u.totalSize)
}
}
for getErrFn() == nil && qerr == nil {
var (
reader io.ReadSeeker
nextChunkLen int
cleanup func()
)
reader, nextChunkLen, cleanup, qerr = u.nextReader()
// check err
if (qerr != nil && qerr != io.EOF) ||
nextChunkLen == 0 {
cleanup()
saveErrFn(qerr)
break
}
qnum++
//fmt.Printf("send chunk: %d\n", qnum)
ch <- uploaderChunk{body: reader, partNum: qnum, cleanup: cleanup, size: nextChunkLen}
}
// Close the channel, wait for workers
close(ch)
wg.Wait()
// Complete upload
var cmResult *CompleteMultipartUploadResult
if err = getErrFn(); err == nil {
sort.Sort(parts)
cmRequest := &CompleteMultipartUploadRequest{}
copyRequest(cmRequest, u.request)
cmRequest.UploadId = Ptr(uploadId)
cmRequest.CompleteMultipartUpload = &CompleteMultipartUpload{Parts: parts}
cmResult, err = u.client.CompleteMultipartUpload(u.context, cmRequest, u.options.ClientOptions...)
}
//fmt.Printf("CompleteMultipartUpload cmResult: %#v, %#v\n", cmResult, err)
if err != nil {
//Abort
if !u.options.LeavePartsOnError {
abortRequest := &AbortMultipartUploadRequest{}
copyRequest(abortRequest, u.request)
abortRequest.UploadId = Ptr(uploadId)
_, _ = u.client.AbortMultipartUpload(u.context, abortRequest, u.options.ClientOptions...)
}
return nil, u.wrapErr(uploadId, err)
}
if enableCRC {
caclCRC := fmt.Sprint(u.combineCRC(crcParts))
if err = checkResponseHeaderCRC64(caclCRC, cmResult.Headers); err != nil {
return nil, u.wrapErr(uploadId, err)
}
}
return &UploadResult{
UploadId: Ptr(uploadId),
ETag: cmResult.ETag,
VersionId: cmResult.VersionId,
HashCRC64: cmResult.HashCRC64,
ResultCommon: cmResult.ResultCommon,
}, nil
}