func()

in registry/storage/driver/s3-aws/v2/s3.go [315:429]


func (d *driver) Writer(ctx context.Context, path string, appendParam bool) (storagedriver.FileWriter, error) {
	key := d.s3Path(path)

	// NOTE(prozlach): The S3 driver uses multipart uploads to implement
	// Container Registry's blob upload protocol (start -> chunks ->
	// commit/cancel). Unlike Azure and Filesystem drivers that allow direct
	// appending to blobs, S3 objects are immutable and this behavior is
	// worked-around using s3' native multipart upload API.
	//
	// Container Registry's abstraction layer permits that data isn't visible
	// until the final Commit() call. This matches S3's behavior where uploads
	// aren't accessible as regular objects until the multipart upload is
	// completed.
	//
	// When appendParam is false, we create a new multipart upload after
	// canceling any existing uploads for this path. This cancellation is
	// necessary because CR assumes it always appends to the same file path,
	// and S3 can accumulate multiple incomplete multipart uploads for the same
	// key. While CR currently uses content-addressable paths (SHAsum) or
	// random UIDs which mitigates collision risks, explicitly cleaning up
	// ensures consistent behavior regardless of path construction patterns.
	//
	// When appendParam is true, we find and resume the existing upload by
	// listing parts that have already been uploaded, allowing the writer to
	// continue where it left off.

	resp, err := d.S3.ListMultipartUploads(
		ctx,
		&s3.ListMultipartUploadsInput{
			Bucket: ptr.String(d.Bucket),
			Prefix: ptr.String(key),
		})
	if err != nil {
		return nil, parseError(path, err)
	}

	idx := slices.IndexFunc(resp.Uploads, func(v types.MultipartUpload) bool { return *v.Key == key })

	if !appendParam {
		if idx >= 0 {
			// Cancel any in-progress uploads for the same path
			for _, upload := range resp.Uploads[idx:] {
				if *upload.Key != key {
					break
				}
				_, err := d.S3.AbortMultipartUpload(
					context.Background(),
					&s3.AbortMultipartUploadInput{
						Bucket:   ptr.String(d.Bucket),
						Key:      ptr.String(key),
						UploadId: ptr.String(*upload.UploadId),
					})
				if err != nil {
					return nil, fmt.Errorf("aborting s3 multipart upload %s: %w", *upload.UploadId, err)
				}
			}
		}

		inputArgs := &s3.CreateMultipartUploadInput{
			Bucket:               ptr.String(d.Bucket),
			Key:                  ptr.String(key),
			ContentType:          d.getContentType(),
			ACL:                  d.getACL(),
			ServerSideEncryption: d.getEncryptionMode(),
			SSEKMSKeyId:          d.getSSEKMSKeyID(),
			StorageClass:         d.getStorageClass(),
		}
		if !d.ChecksumDisabled {
			inputArgs.ChecksumAlgorithm = d.ChecksumAlgorithm
		}
		mpUpload, err := d.S3.CreateMultipartUpload(ctx, inputArgs)
		if err != nil {
			return nil, fmt.Errorf("creating new multipart upload: %w", err)
		}

		return d.newWriter(key, *mpUpload.UploadId, nil), nil
	}

	if idx == -1 {
		return nil, storagedriver.PathNotFoundError{Path: path, DriverName: common.V2DriverName}
	}
	mpUpload := resp.Uploads[idx]

	// respLoopCount is the number of response pages traversed. Each increment
	// of respLoopCount signifies that (at most) 1 full page of parts was
	// pushed, where one full page of parts is equivalent to 1,000 uploaded
	// parts which in turn is equivalent to 10485760 bytes of data.
	// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListParts.html
	respLoopCount := 0

	allParts := make([]types.Part, 0)
	listResp := &s3.ListPartsOutput{
		IsTruncated: ptr.Bool(true),
	}
	for resp.IsTruncated != nil && *listResp.IsTruncated {
		// error out if we have pushed more than 100GB of parts
		if respLoopCount > maxListRespLoop {
			return nil, ErrMaxListRespExceeded
		}
		listResp, err = d.S3.ListParts(
			ctx,
			&s3.ListPartsInput{
				Bucket:           ptr.String(d.Bucket),
				Key:              ptr.String(key),
				UploadId:         mpUpload.UploadId,
				PartNumberMarker: listResp.NextPartNumberMarker,
			})
		if err != nil {
			return nil, parseError(path, err)
		}
		allParts = append(allParts, listResp.Parts...)
		respLoopCount++
	}
	return d.newWriter(key, *mpUpload.UploadId, allParts), nil
}