in registry/storage/driver/s3-aws/v1/s3.go [490:592]
func (d *driver) copy(ctx context.Context, sourcePath, destPath string) error {
// S3 can copy objects up to 5 GB in size with a single PUT Object - Copy
// operation. For larger objects, the multipart upload API must be used.
//
// Empirically, multipart copy is fastest with 32 MB parts and is faster
// than PUT Object - Copy for objects larger than 32 MB.
fileInfo, err := d.Stat(ctx, sourcePath)
if err != nil {
return parseError(sourcePath, err)
}
if fileInfo.Size() <= d.MultipartCopyThresholdSize {
_, err = d.S3.CopyObjectWithContext(
ctx,
&s3.CopyObjectInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(destPath)),
ContentType: d.getContentType(),
ACL: d.getACL(),
ServerSideEncryption: d.getEncryptionMode(),
SSEKMSKeyId: d.getSSEKMSKeyID(),
StorageClass: d.getStorageClass(),
CopySource: aws.String(d.Bucket + "/" + d.s3Path(sourcePath)),
})
if err != nil {
return parseError(sourcePath, err)
}
return nil
}
createResp, err := d.S3.CreateMultipartUploadWithContext(
ctx,
&s3.CreateMultipartUploadInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(destPath)),
ContentType: d.getContentType(),
ACL: d.getACL(),
SSEKMSKeyId: d.getSSEKMSKeyID(),
ServerSideEncryption: d.getEncryptionMode(),
StorageClass: d.getStorageClass(),
})
if err != nil {
return err
}
numParts := (fileInfo.Size() + d.MultipartCopyChunkSize - 1) / d.MultipartCopyChunkSize
completedParts := make([]*s3.CompletedPart, numParts)
errChan := make(chan error, numParts)
// Reduce the client/server exposure to long lived connections regardless of
// how many requests per second are allowed.
limiter := make(chan struct{}, d.MultipartCopyMaxConcurrency)
for i := range completedParts {
i := int64(i) // nolint: gosec // index will always be a non-negative number
go func() {
limiter <- struct{}{}
firstByte := i * d.MultipartCopyChunkSize
lastByte := firstByte + d.MultipartCopyChunkSize - 1
if lastByte >= fileInfo.Size() {
lastByte = fileInfo.Size() - 1
}
uploadResp, err := d.S3.UploadPartCopyWithContext(
ctx,
&s3.UploadPartCopyInput{
Bucket: aws.String(d.Bucket),
CopySource: aws.String(d.Bucket + "/" + d.s3Path(sourcePath)),
Key: aws.String(d.s3Path(destPath)),
PartNumber: aws.Int64(i + 1),
UploadId: createResp.UploadId,
CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", firstByte, lastByte)),
})
if err == nil {
completedParts[i] = &s3.CompletedPart{
ETag: uploadResp.CopyPartResult.ETag,
PartNumber: aws.Int64(i + 1),
}
}
errChan <- err
<-limiter
}()
}
for range completedParts {
err := <-errChan
if err != nil {
return err
}
}
_, err = d.S3.CompleteMultipartUploadWithContext(
ctx,
&s3.CompleteMultipartUploadInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(destPath)),
UploadId: createResp.UploadId,
MultipartUpload: &s3.CompletedMultipartUpload{Parts: completedParts},
})
return err
}