in registry/storage/driver/s3-aws/v2/s3.go [598:710]
func (d *driver) copy(ctx context.Context, sourcePath, destPath string) error {
// S3 can copy objects up to 5 GB in size with a single PUT Object - Copy
// operation. For larger objects, the multipart upload API must be used.
//
// Empirically, multipart copy is fastest with 32 MB parts and is faster
// than PUT Object - Copy for objects larger than 32 MB.
fileInfo, err := d.Stat(ctx, sourcePath)
if err != nil {
return parseError(sourcePath, err)
}
if fileInfo.Size() <= d.MultipartCopyThresholdSize {
inputArgs := &s3.CopyObjectInput{
Bucket: ptr.String(d.Bucket),
Key: ptr.String(d.s3Path(destPath)),
ContentType: d.getContentType(),
ACL: d.getACL(),
ServerSideEncryption: d.getEncryptionMode(),
SSEKMSKeyId: d.getSSEKMSKeyID(),
StorageClass: d.getStorageClass(),
CopySource: ptr.String(d.Bucket + "/" + d.s3Path(sourcePath)),
}
if !d.ChecksumDisabled {
inputArgs.ChecksumAlgorithm = d.ChecksumAlgorithm
}
_, err = d.S3.CopyObject(ctx, inputArgs)
if err != nil {
return parseError(sourcePath, err)
}
return nil
}
inputArgs := &s3.CreateMultipartUploadInput{
Bucket: ptr.String(d.Bucket),
Key: ptr.String(d.s3Path(destPath)),
ContentType: d.getContentType(),
ACL: d.getACL(),
SSEKMSKeyId: d.getSSEKMSKeyID(),
ServerSideEncryption: d.getEncryptionMode(),
StorageClass: d.getStorageClass(),
}
if !d.ChecksumDisabled {
inputArgs.ChecksumAlgorithm = d.ChecksumAlgorithm
}
createResp, err := d.S3.CreateMultipartUpload(ctx, inputArgs)
if err != nil {
return err
}
numParts := (fileInfo.Size() + d.MultipartCopyChunkSize - 1) / d.MultipartCopyChunkSize
completedParts := make([]types.CompletedPart, numParts)
g, gctx := errgroup.WithContext(ctx)
// Reduce the client/server exposure to long lived connections regardless of
// how many requests per second are allowed.
g.SetLimit(d.MultipartCopyMaxConcurrency)
for i := range completedParts {
g.Go(func() error {
// Check if any other goroutine has failed
select {
case <-gctx.Done():
return gctx.Err()
default:
firstByte := int64(i) * d.MultipartCopyChunkSize
lastByte := firstByte + d.MultipartCopyChunkSize - 1
if lastByte >= fileInfo.Size() {
lastByte = fileInfo.Size() - 1
}
uploadResp, err := d.S3.UploadPartCopy(
gctx,
&s3.UploadPartCopyInput{
Bucket: ptr.String(d.Bucket),
CopySource: ptr.String(d.Bucket + "/" + d.s3Path(sourcePath)),
Key: ptr.String(d.s3Path(destPath)),
PartNumber: ptr.Int32(int32(i + 1)), // nolint: gosec // index will always be a non-negative number
UploadId: createResp.UploadId,
CopySourceRange: ptr.String(fmt.Sprintf("bytes=%d-%d", firstByte, lastByte)),
})
if err == nil {
completedParts[i] = types.CompletedPart{
ETag: uploadResp.CopyPartResult.ETag,
PartNumber: ptr.Int32(int32(i + 1)), // nolint: gosec // index will always be a non-negative number
ChecksumCRC32: uploadResp.CopyPartResult.ChecksumCRC32,
ChecksumCRC32C: uploadResp.CopyPartResult.ChecksumCRC32C,
ChecksumCRC64NVME: uploadResp.CopyPartResult.ChecksumCRC64NVME,
ChecksumSHA1: uploadResp.CopyPartResult.ChecksumSHA1,
ChecksumSHA256: uploadResp.CopyPartResult.ChecksumSHA256,
}
return nil
}
return fmt.Errorf("multipart upload %d failed: %w", i, err)
}
})
}
if err := g.Wait(); err != nil {
return err
}
_, err = d.S3.CompleteMultipartUpload(
ctx,
&s3.CompleteMultipartUploadInput{
Bucket: ptr.String(d.Bucket),
Key: ptr.String(d.s3Path(destPath)),
UploadId: createResp.UploadId,
MultipartUpload: &types.CompletedMultipartUpload{Parts: completedParts},
})
return err
}