func()

in oss/uploader.go [508:681]


func (u *uploaderDelegate) multiPart() (*UploadResult, error) {
	release := func() {
		if u.partPool != nil {
			u.partPool.Close()
		}
	}
	defer release()

	var (
		wg        sync.WaitGroup
		mu        sync.Mutex
		parts     UploadParts
		errValue  atomic.Value
		crcParts  uploadPartCRCs
		enableCRC = (u.base.featureFlags & FeatureEnableCRC64CheckUpload) > 0
	)

	// Init the multipart
	uploadIdInfo, err := u.getUploadId()
	if err != nil {
		return nil, u.wrapErr("", err)
	}
	//fmt.Printf("getUploadId result: %v, %#v\n", uploadId, err)
	uploadId := uploadIdInfo.uploadId
	startPartNum := uploadIdInfo.startNum

	// Update Checkpoint
	if u.checkpoint != nil {
		u.checkpoint.Info.Data.UploadInfo.UploadId = uploadId
		u.checkpoint.dump()
	}

	saveErrFn := func(e error) {
		errValue.Store(e)
	}

	getErrFn := func() error {
		v := errValue.Load()
		if v == nil {
			return nil
		}
		e, _ := v.(error)
		return e
	}

	// readChunk runs in worker goroutines to pull chunks off of the ch channel
	readChunkFn := func(ch chan uploaderChunk) {
		defer wg.Done()
		for {
			data, ok := <-ch
			if !ok {
				break
			}

			if getErrFn() == nil {
				upResult, err := u.client.UploadPart(
					u.context,
					&UploadPartRequest{
						Bucket:              u.request.Bucket,
						Key:                 u.request.Key,
						UploadId:            Ptr(uploadId),
						PartNumber:          data.partNum,
						Body:                data.body,
						CSEMultiPartContext: uploadIdInfo.cseContext,
						RequestPayer:        u.request.RequestPayer,
					},
					u.options.ClientOptions...)
				//fmt.Printf("UploadPart result: %#v, %#v\n", upResult, err)

				if err == nil {
					mu.Lock()
					parts = append(parts, UploadPart{ETag: upResult.ETag, PartNumber: data.partNum})
					if enableCRC {
						crcParts = append(crcParts,
							uploadPartCRC{partNumber: data.partNum, hashCRC64: upResult.HashCRC64, size: data.size})
					}
					if u.request.ProgressFn != nil {
						u.transferred += int64(data.size)
						u.request.ProgressFn(int64(data.size), u.transferred, u.totalSize)
					}
					mu.Unlock()
				} else {
					saveErrFn(err)
				}
			}
			data.cleanup()
		}
	}

	ch := make(chan uploaderChunk, u.options.ParallelNum)
	for i := 0; i < u.options.ParallelNum; i++ {
		wg.Add(1)
		go readChunkFn(ch)
	}

	// Read and queue the parts
	var (
		qnum int32 = startPartNum
		qerr error = nil
	)

	// consume uploaded parts
	if u.readerPos > 0 {
		for _, p := range u.uploadedParts {
			parts = append(parts, UploadPart{PartNumber: p.PartNumber, ETag: p.ETag})
		}
		if u.request.ProgressFn != nil {
			u.transferred = u.readerPos
			u.request.ProgressFn(u.readerPos, u.transferred, u.totalSize)
		}
	}

	for getErrFn() == nil && qerr == nil {
		var (
			reader       io.ReadSeeker
			nextChunkLen int
			cleanup      func()
		)

		reader, nextChunkLen, cleanup, qerr = u.nextReader()
		// check err
		if (qerr != nil && qerr != io.EOF) ||
			nextChunkLen == 0 {
			cleanup()
			saveErrFn(qerr)
			break
		}
		qnum++
		//fmt.Printf("send chunk: %d\n", qnum)
		ch <- uploaderChunk{body: reader, partNum: qnum, cleanup: cleanup, size: nextChunkLen}
	}

	// Close the channel, wait for workers
	close(ch)
	wg.Wait()

	// Complete upload
	var cmResult *CompleteMultipartUploadResult
	if err = getErrFn(); err == nil {
		sort.Sort(parts)
		cmRequest := &CompleteMultipartUploadRequest{}
		copyRequest(cmRequest, u.request)
		cmRequest.UploadId = Ptr(uploadId)
		cmRequest.CompleteMultipartUpload = &CompleteMultipartUpload{Parts: parts}
		cmResult, err = u.client.CompleteMultipartUpload(u.context, cmRequest, u.options.ClientOptions...)
	}
	//fmt.Printf("CompleteMultipartUpload cmResult: %#v, %#v\n", cmResult, err)

	if err != nil {
		//Abort
		if !u.options.LeavePartsOnError {
			abortRequest := &AbortMultipartUploadRequest{}
			copyRequest(abortRequest, u.request)
			abortRequest.UploadId = Ptr(uploadId)
			_, _ = u.client.AbortMultipartUpload(u.context, abortRequest, u.options.ClientOptions...)
		}
		return nil, u.wrapErr(uploadId, err)
	}

	if enableCRC {
		caclCRC := fmt.Sprint(u.combineCRC(crcParts))
		if err = checkResponseHeaderCRC64(caclCRC, cmResult.Headers); err != nil {
			return nil, u.wrapErr(uploadId, err)
		}
	}

	return &UploadResult{
		UploadId:     Ptr(uploadId),
		ETag:         cmResult.ETag,
		VersionId:    cmResult.VersionId,
		HashCRC64:    cmResult.HashCRC64,
		ResultCommon: cmResult.ResultCommon,
	}, nil
}