func()

in registry/storage/driver/s3-aws/v1/s3.go [490:592]


func (d *driver) copy(ctx context.Context, sourcePath, destPath string) error {
	// S3 can copy objects up to 5 GB in size with a single PUT Object - Copy
	// operation. For larger objects, the multipart upload API must be used.
	//
	// Empirically, multipart copy is fastest with 32 MB parts and is faster
	// than PUT Object - Copy for objects larger than 32 MB.

	fileInfo, err := d.Stat(ctx, sourcePath)
	if err != nil {
		return parseError(sourcePath, err)
	}

	if fileInfo.Size() <= d.MultipartCopyThresholdSize {
		_, err = d.S3.CopyObjectWithContext(
			ctx,
			&s3.CopyObjectInput{
				Bucket:               aws.String(d.Bucket),
				Key:                  aws.String(d.s3Path(destPath)),
				ContentType:          d.getContentType(),
				ACL:                  d.getACL(),
				ServerSideEncryption: d.getEncryptionMode(),
				SSEKMSKeyId:          d.getSSEKMSKeyID(),
				StorageClass:         d.getStorageClass(),
				CopySource:           aws.String(d.Bucket + "/" + d.s3Path(sourcePath)),
			})
		if err != nil {
			return parseError(sourcePath, err)
		}
		return nil
	}

	createResp, err := d.S3.CreateMultipartUploadWithContext(
		ctx,
		&s3.CreateMultipartUploadInput{
			Bucket:               aws.String(d.Bucket),
			Key:                  aws.String(d.s3Path(destPath)),
			ContentType:          d.getContentType(),
			ACL:                  d.getACL(),
			SSEKMSKeyId:          d.getSSEKMSKeyID(),
			ServerSideEncryption: d.getEncryptionMode(),
			StorageClass:         d.getStorageClass(),
		})
	if err != nil {
		return err
	}

	numParts := (fileInfo.Size() + d.MultipartCopyChunkSize - 1) / d.MultipartCopyChunkSize
	completedParts := make([]*s3.CompletedPart, numParts)
	errChan := make(chan error, numParts)

	// Reduce the client/server exposure to long lived connections regardless of
	// how many requests per second are allowed.
	limiter := make(chan struct{}, d.MultipartCopyMaxConcurrency)

	for i := range completedParts {
		i := int64(i) // nolint: gosec // index will always be a non-negative number
		go func() {
			limiter <- struct{}{}

			firstByte := i * d.MultipartCopyChunkSize
			lastByte := firstByte + d.MultipartCopyChunkSize - 1
			if lastByte >= fileInfo.Size() {
				lastByte = fileInfo.Size() - 1
			}

			uploadResp, err := d.S3.UploadPartCopyWithContext(
				ctx,
				&s3.UploadPartCopyInput{
					Bucket:          aws.String(d.Bucket),
					CopySource:      aws.String(d.Bucket + "/" + d.s3Path(sourcePath)),
					Key:             aws.String(d.s3Path(destPath)),
					PartNumber:      aws.Int64(i + 1),
					UploadId:        createResp.UploadId,
					CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", firstByte, lastByte)),
				})
			if err == nil {
				completedParts[i] = &s3.CompletedPart{
					ETag:       uploadResp.CopyPartResult.ETag,
					PartNumber: aws.Int64(i + 1),
				}
			}
			errChan <- err
			<-limiter
		}()
	}

	for range completedParts {
		err := <-errChan
		if err != nil {
			return err
		}
	}

	_, err = d.S3.CompleteMultipartUploadWithContext(
		ctx,
		&s3.CompleteMultipartUploadInput{
			Bucket:          aws.String(d.Bucket),
			Key:             aws.String(d.s3Path(destPath)),
			UploadId:        createResp.UploadId,
			MultipartUpload: &s3.CompletedMultipartUpload{Parts: completedParts},
		})
	return err
}