oss/uploader.go (594 lines of code) (raw):
package oss
import (
"bytes"
"context"
"fmt"
"io"
"os"
"sort"
"strconv"
"sync"
"sync/atomic"
)
type UploaderOptions struct {
PartSize int64
ParallelNum int
LeavePartsOnError bool
EnableCheckpoint bool
CheckpointDir string
ClientOptions []func(*Options)
}
type Uploader struct {
options UploaderOptions
client UploadAPIClient
featureFlags FeatureFlagsType
isEncryptionClient bool
}
// NewUploader creates a new Uploader instance to upload objects.
// Pass In additional functional options to customize the uploader's behavior.
func NewUploader(c UploadAPIClient, optFns ...func(*UploaderOptions)) *Uploader {
options := UploaderOptions{
PartSize: DefaultUploadPartSize,
ParallelNum: DefaultUploadParallel,
LeavePartsOnError: false,
}
for _, fn := range optFns {
fn(&options)
}
u := &Uploader{
client: c,
options: options,
isEncryptionClient: false,
}
//Get Client Feature
switch t := c.(type) {
case *Client:
u.featureFlags = t.options.FeatureFlags
case *EncryptionClient:
u.featureFlags = t.Unwrap().options.FeatureFlags
u.isEncryptionClient = true
}
return u
}
type UploadResult struct {
UploadId *string
ETag *string
VersionId *string
HashCRC64 *string
ResultCommon
}
type UploadError struct {
Err error
UploadId string
Path string
}
func (m *UploadError) Error() string {
var extra string
if m.Err != nil {
extra = fmt.Sprintf(", cause: %s", m.Err.Error())
}
return fmt.Sprintf("upload failed, upload id: %s%s", m.UploadId, extra)
}
func (m *UploadError) Unwrap() error {
return m.Err
}
func (u *Uploader) UploadFrom(ctx context.Context, request *PutObjectRequest, body io.Reader, optFns ...func(*UploaderOptions)) (*UploadResult, error) {
// Uploader wrapper
delegate, err := u.newDelegate(ctx, request, optFns...)
if err != nil {
return nil, err
}
delegate.body = body
if err = delegate.applySource(); err != nil {
return nil, err
}
return delegate.upload()
}
func (u *Uploader) UploadFile(ctx context.Context, request *PutObjectRequest, filePath string, optFns ...func(*UploaderOptions)) (*UploadResult, error) {
// Uploader wrapper
delegate, err := u.newDelegate(ctx, request, optFns...)
if err != nil {
return nil, err
}
// Source
if err = delegate.checkSource(filePath); err != nil {
return nil, err
}
var file *os.File
if file, err = delegate.openReader(); err != nil {
return nil, err
}
delegate.body = file
if err = delegate.applySource(); err != nil {
return nil, err
}
if err = delegate.checkCheckpoint(); err != nil {
return nil, err
}
if err = delegate.adjustSource(); err != nil {
return nil, err
}
result, err := delegate.upload()
return result, delegate.closeReader(file, err)
}
type uploaderDelegate struct {
base *Uploader
options UploaderOptions
client UploadAPIClient
context context.Context
request *PutObjectRequest
body io.Reader
readerPos int64
totalSize int64
hashCRC64 uint64
transferred int64
// Source's Info, from file or reader
filePath string
fileInfo os.FileInfo
// for resumable upload
uploadId string
partNumber int32
cseContext *EncryptionMultiPartContext
uploadedParts []Part
partPool byteSlicePool
checkpoint *uploadCheckpoint
}
type uploadIdInfo struct {
uploadId string
startNum int32
cseContext *EncryptionMultiPartContext
}
func (u *Uploader) newDelegate(ctx context.Context, request *PutObjectRequest, optFns ...func(*UploaderOptions)) (*uploaderDelegate, error) {
if request == nil {
return nil, NewErrParamNull("request")
}
if request.Bucket == nil {
return nil, NewErrParamNull("request.Bucket")
}
if request.Key == nil {
return nil, NewErrParamNull("request.Key")
}
d := uploaderDelegate{
base: u,
options: u.options,
client: u.client,
context: ctx,
request: request,
}
for _, opt := range optFns {
opt(&d.options)
}
if d.options.ParallelNum <= 0 {
d.options.ParallelNum = DefaultUploadParallel
}
if d.options.PartSize <= 0 {
d.options.PartSize = DefaultUploadPartSize
}
if _, ok := d.request.Parameters["sequential"]; ok {
d.options.ParallelNum = 1
}
return &d, nil
}
func (u *uploaderDelegate) checkSource(filePath string) error {
if filePath == "" {
return NewErrParamRequired("filePath")
}
// if !FileExists(filePath) {
// return fmt.Errorf("File not exists, %v", filePath)
// }
info, err := os.Stat(filePath)
if err != nil {
if os.IsNotExist(err) {
return fmt.Errorf("File not exists, %v", filePath)
}
return err
}
u.filePath = filePath
u.fileInfo = info
return nil
}
func (u *uploaderDelegate) applySource() error {
if u.body == nil {
return NewErrParamNull("the body is null")
}
totalSize := GetReaderLen(u.body)
//Part Size
partSize := u.options.PartSize
if totalSize > 0 {
for totalSize/partSize >= int64(MaxUploadParts) {
partSize += u.options.PartSize
}
}
u.totalSize = totalSize
u.options.PartSize = partSize
return nil
}
func (u *uploaderDelegate) adjustSource() error {
// resume from upload id
if u.uploadId != "" {
// if the body supports seek
r, ok := u.body.(io.Seeker)
// not support
if !ok {
u.uploadId = ""
return nil
}
// if upload id is valid
paginator := NewListPartsPaginator(u.client, &ListPartsRequest{
Bucket: u.request.Bucket,
Key: u.request.Key,
UploadId: Ptr(u.uploadId),
})
// find consecutive sequence from min part number
var (
checkPartNumber int32 = 1
updateCRC64 bool = ((u.base.featureFlags & FeatureEnableCRC64CheckUpload) > 0)
hashCRC64 uint64 = 0
page *ListPartsResult
err error
uploadedParts []Part
)
outerLoop:
for paginator.HasNext() {
page, err = paginator.NextPage(u.context, u.options.ClientOptions...)
if err != nil {
u.uploadId = ""
return nil
}
for _, p := range page.Parts {
if p.PartNumber != checkPartNumber ||
p.Size != u.options.PartSize {
break outerLoop
}
checkPartNumber++
uploadedParts = append(uploadedParts, p)
if updateCRC64 && p.HashCRC64 != nil {
value, _ := strconv.ParseUint(ToString(p.HashCRC64), 10, 64)
hashCRC64 = CRC64Combine(hashCRC64, value, uint64(p.Size))
}
}
}
partNumber := checkPartNumber - 1
newOffset := int64(partNumber) * u.options.PartSize
if _, err := r.Seek(newOffset, io.SeekStart); err != nil {
u.uploadId = ""
return nil
}
cseContext, err := u.resumeCSEContext(page)
if err != nil {
u.uploadId = ""
return nil
}
u.partNumber = partNumber
u.readerPos = newOffset
u.hashCRC64 = hashCRC64
u.cseContext = cseContext
u.uploadedParts = uploadedParts
}
return nil
}
func (d *uploaderDelegate) checkCheckpoint() error {
if d.options.EnableCheckpoint {
d.checkpoint = newUploadCheckpoint(d.request, d.filePath, d.options.CheckpointDir, d.fileInfo, d.options.PartSize)
if err := d.checkpoint.load(); err != nil {
return err
}
if d.checkpoint.Loaded {
d.uploadId = d.checkpoint.Info.Data.UploadInfo.UploadId
}
d.options.LeavePartsOnError = true
}
return nil
}
func (d *uploaderDelegate) openReader() (*os.File, error) {
file, err := os.Open(d.filePath)
if err != nil {
return nil, err
}
d.body = file
return file, nil
}
func (d *uploaderDelegate) closeReader(file *os.File, err error) error {
if file != nil {
file.Close()
}
if d.checkpoint != nil && err == nil {
d.checkpoint.remove()
}
d.body = nil
d.checkpoint = nil
return err
}
func (d *uploaderDelegate) resumeCSEContext(result *ListPartsResult) (*EncryptionMultiPartContext, error) {
if !d.base.isEncryptionClient {
return nil, nil
}
sc, ok := d.client.(*EncryptionClient)
if !ok {
return nil, fmt.Errorf("Not EncryptionClient")
}
envelope, err := getEnvelopeFromListParts(result)
if err != nil {
return nil, err
}
cc, err := sc.defualtCCBuilder.ContentCipherEnv(envelope)
if err != nil {
return nil, err
}
cseContext := &EncryptionMultiPartContext{
ContentCipher: cc,
PartSize: ToInt64(result.ClientEncryptionPartSize),
DataSize: ToInt64(result.ClientEncryptionDataSize),
}
if !cseContext.Valid() {
return nil, fmt.Errorf("EncryptionMultiPartContext is invalid")
}
return cseContext, nil
}
func (u *uploaderDelegate) upload() (*UploadResult, error) {
if u.totalSize >= 0 && u.totalSize < u.options.PartSize {
return u.singlePart()
}
return u.multiPart()
}
func (u *uploaderDelegate) singlePart() (*UploadResult, error) {
request := &PutObjectRequest{}
copyRequest(request, u.request)
request.Body = u.body
if request.ContentType == nil {
request.ContentType = u.getContentType()
}
result, err := u.client.PutObject(u.context, request, u.options.ClientOptions...)
if err != nil {
return nil, u.wrapErr("", err)
}
return &UploadResult{
ETag: result.ETag,
VersionId: result.VersionId,
HashCRC64: result.HashCRC64,
ResultCommon: result.ResultCommon,
}, nil
}
func (u *uploaderDelegate) nextReader() (io.ReadSeeker, int, func(), error) {
type readerAtSeeker interface {
io.ReaderAt
io.ReadSeeker
}
switch r := u.body.(type) {
case readerAtSeeker:
var err error
n := u.options.PartSize
if u.totalSize >= 0 {
bytesLeft := u.totalSize - u.readerPos
if bytesLeft <= u.options.PartSize {
err = io.EOF
n = bytesLeft
}
}
reader := io.NewSectionReader(r, u.readerPos, n)
cleanup := func() {}
u.readerPos += n
return reader, int(n), cleanup, err
default:
if u.partPool == nil {
u.partPool = newByteSlicePool(u.options.PartSize)
u.partPool.ModifyCapacity(u.options.ParallelNum + 1)
}
part, err := u.partPool.Get(u.context)
if err != nil {
return nil, 0, func() {}, err
}
n, err := readFill(r, *part)
u.readerPos += int64(n)
cleanup := func() {
u.partPool.Put(part)
}
return bytes.NewReader((*part)[0:n]), n, cleanup, err
}
}
type uploaderChunk struct {
partNum int32
size int
body io.ReadSeeker
cleanup func()
}
type uploadPartCRC struct {
partNumber int32
size int
hashCRC64 *string
}
type uploadPartCRCs []uploadPartCRC
func (slice uploadPartCRCs) Len() int {
return len(slice)
}
func (slice uploadPartCRCs) Less(i, j int) bool {
return slice[i].partNumber < slice[j].partNumber
}
func (slice uploadPartCRCs) Swap(i, j int) {
slice[i], slice[j] = slice[j], slice[i]
}
func (u *uploaderDelegate) multiPart() (*UploadResult, error) {
release := func() {
if u.partPool != nil {
u.partPool.Close()
}
}
defer release()
var (
wg sync.WaitGroup
mu sync.Mutex
parts UploadParts
errValue atomic.Value
crcParts uploadPartCRCs
enableCRC = (u.base.featureFlags & FeatureEnableCRC64CheckUpload) > 0
)
// Init the multipart
uploadIdInfo, err := u.getUploadId()
if err != nil {
return nil, u.wrapErr("", err)
}
//fmt.Printf("getUploadId result: %v, %#v\n", uploadId, err)
uploadId := uploadIdInfo.uploadId
startPartNum := uploadIdInfo.startNum
// Update Checkpoint
if u.checkpoint != nil {
u.checkpoint.Info.Data.UploadInfo.UploadId = uploadId
u.checkpoint.dump()
}
saveErrFn := func(e error) {
errValue.Store(e)
}
getErrFn := func() error {
v := errValue.Load()
if v == nil {
return nil
}
e, _ := v.(error)
return e
}
// readChunk runs in worker goroutines to pull chunks off of the ch channel
readChunkFn := func(ch chan uploaderChunk) {
defer wg.Done()
for {
data, ok := <-ch
if !ok {
break
}
if getErrFn() == nil {
upResult, err := u.client.UploadPart(
u.context,
&UploadPartRequest{
Bucket: u.request.Bucket,
Key: u.request.Key,
UploadId: Ptr(uploadId),
PartNumber: data.partNum,
Body: data.body,
CSEMultiPartContext: uploadIdInfo.cseContext,
RequestPayer: u.request.RequestPayer,
},
u.options.ClientOptions...)
//fmt.Printf("UploadPart result: %#v, %#v\n", upResult, err)
if err == nil {
mu.Lock()
parts = append(parts, UploadPart{ETag: upResult.ETag, PartNumber: data.partNum})
if enableCRC {
crcParts = append(crcParts,
uploadPartCRC{partNumber: data.partNum, hashCRC64: upResult.HashCRC64, size: data.size})
}
if u.request.ProgressFn != nil {
u.transferred += int64(data.size)
u.request.ProgressFn(int64(data.size), u.transferred, u.totalSize)
}
mu.Unlock()
} else {
saveErrFn(err)
}
}
data.cleanup()
}
}
ch := make(chan uploaderChunk, u.options.ParallelNum)
for i := 0; i < u.options.ParallelNum; i++ {
wg.Add(1)
go readChunkFn(ch)
}
// Read and queue the parts
var (
qnum int32 = startPartNum
qerr error = nil
)
// consume uploaded parts
if u.readerPos > 0 {
for _, p := range u.uploadedParts {
parts = append(parts, UploadPart{PartNumber: p.PartNumber, ETag: p.ETag})
}
if u.request.ProgressFn != nil {
u.transferred = u.readerPos
u.request.ProgressFn(u.readerPos, u.transferred, u.totalSize)
}
}
for getErrFn() == nil && qerr == nil {
var (
reader io.ReadSeeker
nextChunkLen int
cleanup func()
)
reader, nextChunkLen, cleanup, qerr = u.nextReader()
// check err
if (qerr != nil && qerr != io.EOF) ||
nextChunkLen == 0 {
cleanup()
saveErrFn(qerr)
break
}
qnum++
//fmt.Printf("send chunk: %d\n", qnum)
ch <- uploaderChunk{body: reader, partNum: qnum, cleanup: cleanup, size: nextChunkLen}
}
// Close the channel, wait for workers
close(ch)
wg.Wait()
// Complete upload
var cmResult *CompleteMultipartUploadResult
if err = getErrFn(); err == nil {
sort.Sort(parts)
cmRequest := &CompleteMultipartUploadRequest{}
copyRequest(cmRequest, u.request)
cmRequest.UploadId = Ptr(uploadId)
cmRequest.CompleteMultipartUpload = &CompleteMultipartUpload{Parts: parts}
cmResult, err = u.client.CompleteMultipartUpload(u.context, cmRequest, u.options.ClientOptions...)
}
//fmt.Printf("CompleteMultipartUpload cmResult: %#v, %#v\n", cmResult, err)
if err != nil {
//Abort
if !u.options.LeavePartsOnError {
abortRequest := &AbortMultipartUploadRequest{}
copyRequest(abortRequest, u.request)
abortRequest.UploadId = Ptr(uploadId)
_, _ = u.client.AbortMultipartUpload(u.context, abortRequest, u.options.ClientOptions...)
}
return nil, u.wrapErr(uploadId, err)
}
if enableCRC {
caclCRC := fmt.Sprint(u.combineCRC(crcParts))
if err = checkResponseHeaderCRC64(caclCRC, cmResult.Headers); err != nil {
return nil, u.wrapErr(uploadId, err)
}
}
return &UploadResult{
UploadId: Ptr(uploadId),
ETag: cmResult.ETag,
VersionId: cmResult.VersionId,
HashCRC64: cmResult.HashCRC64,
ResultCommon: cmResult.ResultCommon,
}, nil
}
func (u *uploaderDelegate) getUploadId() (info uploadIdInfo, err error) {
if u.uploadId != "" {
return uploadIdInfo{
uploadId: u.uploadId,
startNum: u.partNumber,
cseContext: u.cseContext,
}, nil
}
// if not exist or fail, create a new upload id
request := &InitiateMultipartUploadRequest{}
copyRequest(request, u.request)
if request.ContentType == nil {
request.ContentType = u.getContentType()
}
if u.base.isEncryptionClient {
request.CSEPartSize = &u.options.PartSize
request.CSEDataSize = &u.totalSize
}
result, err := u.client.InitiateMultipartUpload(u.context, request, u.options.ClientOptions...)
if err != nil {
return info, err
}
return uploadIdInfo{
uploadId: *result.UploadId,
startNum: 0,
cseContext: result.CSEMultiPartContext,
}, nil
}
func (u *uploaderDelegate) getContentType() *string {
if u.filePath != "" {
if contentType := TypeByExtension(u.filePath); contentType != "" {
return Ptr(contentType)
}
}
return nil
}
func (u *uploaderDelegate) wrapErr(uploadId string, err error) error {
return &UploadError{
UploadId: uploadId,
Path: fmt.Sprintf("oss://%s/%s", *u.request.Bucket, *u.request.Key),
Err: err}
}
func (u *uploaderDelegate) combineCRC(crcs uploadPartCRCs) uint64 {
if len(crcs) == 0 {
return 0
}
sort.Sort(crcs)
crc := u.hashCRC64
for _, c := range crcs {
if c.hashCRC64 == nil {
return 0
}
if value, err := strconv.ParseUint(*c.hashCRC64, 10, 64); err == nil {
crc = CRC64Combine(crc, value, uint64(c.size))
} else {
break
}
}
return crc
}