func()

in registry/storage/driver/s3-aws/v2/s3.go [598:710]


func (d *driver) copy(ctx context.Context, sourcePath, destPath string) error {
	// S3 can copy objects up to 5 GB in size with a single PUT Object - Copy
	// operation. For larger objects, the multipart upload API must be used.
	//
	// Empirically, multipart copy is fastest with 32 MB parts and is faster
	// than PUT Object - Copy for objects larger than 32 MB.

	fileInfo, err := d.Stat(ctx, sourcePath)
	if err != nil {
		return parseError(sourcePath, err)
	}

	if fileInfo.Size() <= d.MultipartCopyThresholdSize {
		inputArgs := &s3.CopyObjectInput{
			Bucket:               ptr.String(d.Bucket),
			Key:                  ptr.String(d.s3Path(destPath)),
			ContentType:          d.getContentType(),
			ACL:                  d.getACL(),
			ServerSideEncryption: d.getEncryptionMode(),
			SSEKMSKeyId:          d.getSSEKMSKeyID(),
			StorageClass:         d.getStorageClass(),
			CopySource:           ptr.String(d.Bucket + "/" + d.s3Path(sourcePath)),
		}
		if !d.ChecksumDisabled {
			inputArgs.ChecksumAlgorithm = d.ChecksumAlgorithm
		}
		_, err = d.S3.CopyObject(ctx, inputArgs)
		if err != nil {
			return parseError(sourcePath, err)
		}
		return nil
	}

	inputArgs := &s3.CreateMultipartUploadInput{
		Bucket:               ptr.String(d.Bucket),
		Key:                  ptr.String(d.s3Path(destPath)),
		ContentType:          d.getContentType(),
		ACL:                  d.getACL(),
		SSEKMSKeyId:          d.getSSEKMSKeyID(),
		ServerSideEncryption: d.getEncryptionMode(),
		StorageClass:         d.getStorageClass(),
	}
	if !d.ChecksumDisabled {
		inputArgs.ChecksumAlgorithm = d.ChecksumAlgorithm
	}
	createResp, err := d.S3.CreateMultipartUpload(ctx, inputArgs)
	if err != nil {
		return err
	}

	numParts := (fileInfo.Size() + d.MultipartCopyChunkSize - 1) / d.MultipartCopyChunkSize
	completedParts := make([]types.CompletedPart, numParts)

	g, gctx := errgroup.WithContext(ctx)

	// Reduce the client/server exposure to long lived connections regardless of
	// how many requests per second are allowed.
	g.SetLimit(d.MultipartCopyMaxConcurrency)

	for i := range completedParts {
		g.Go(func() error {
			// Check if any other goroutine has failed
			select {
			case <-gctx.Done():
				return gctx.Err()
			default:
				firstByte := int64(i) * d.MultipartCopyChunkSize
				lastByte := firstByte + d.MultipartCopyChunkSize - 1
				if lastByte >= fileInfo.Size() {
					lastByte = fileInfo.Size() - 1
				}

				uploadResp, err := d.S3.UploadPartCopy(
					gctx,
					&s3.UploadPartCopyInput{
						Bucket:          ptr.String(d.Bucket),
						CopySource:      ptr.String(d.Bucket + "/" + d.s3Path(sourcePath)),
						Key:             ptr.String(d.s3Path(destPath)),
						PartNumber:      ptr.Int32(int32(i + 1)), // nolint: gosec // index will always be a non-negative number
						UploadId:        createResp.UploadId,
						CopySourceRange: ptr.String(fmt.Sprintf("bytes=%d-%d", firstByte, lastByte)),
					})
				if err == nil {
					completedParts[i] = types.CompletedPart{
						ETag:              uploadResp.CopyPartResult.ETag,
						PartNumber:        ptr.Int32(int32(i + 1)), // nolint: gosec // index will always be a non-negative number
						ChecksumCRC32:     uploadResp.CopyPartResult.ChecksumCRC32,
						ChecksumCRC32C:    uploadResp.CopyPartResult.ChecksumCRC32C,
						ChecksumCRC64NVME: uploadResp.CopyPartResult.ChecksumCRC64NVME,
						ChecksumSHA1:      uploadResp.CopyPartResult.ChecksumSHA1,
						ChecksumSHA256:    uploadResp.CopyPartResult.ChecksumSHA256,
					}
					return nil
				}
				return fmt.Errorf("multipart upload %d failed: %w", i, err)
			}
		})
	}

	if err := g.Wait(); err != nil {
		return err
	}

	_, err = d.S3.CompleteMultipartUpload(
		ctx,
		&s3.CompleteMultipartUploadInput{
			Bucket:          ptr.String(d.Bucket),
			Key:             ptr.String(d.s3Path(destPath)),
			UploadId:        createResp.UploadId,
			MultipartUpload: &types.CompletedMultipartUpload{Parts: completedParts},
		})
	return err
}