in registry/storage/driver/s3-aws/v2/s3.go [315:429]
func (d *driver) Writer(ctx context.Context, path string, appendParam bool) (storagedriver.FileWriter, error) {
key := d.s3Path(path)
// NOTE(prozlach): The S3 driver uses multipart uploads to implement
// Container Registry's blob upload protocol (start -> chunks ->
// commit/cancel). Unlike Azure and Filesystem drivers that allow direct
// appending to blobs, S3 objects are immutable and this behavior is
// worked-around using s3' native multipart upload API.
//
// Container Registry's abstraction layer permits that data isn't visible
// until the final Commit() call. This matches S3's behavior where uploads
// aren't accessible as regular objects until the multipart upload is
// completed.
//
// When appendParam is false, we create a new multipart upload after
// canceling any existing uploads for this path. This cancellation is
// necessary because CR assumes it always appends to the same file path,
// and S3 can accumulate multiple incomplete multipart uploads for the same
// key. While CR currently uses content-addressable paths (SHAsum) or
// random UIDs which mitigates collision risks, explicitly cleaning up
// ensures consistent behavior regardless of path construction patterns.
//
// When appendParam is true, we find and resume the existing upload by
// listing parts that have already been uploaded, allowing the writer to
// continue where it left off.
resp, err := d.S3.ListMultipartUploads(
ctx,
&s3.ListMultipartUploadsInput{
Bucket: ptr.String(d.Bucket),
Prefix: ptr.String(key),
})
if err != nil {
return nil, parseError(path, err)
}
idx := slices.IndexFunc(resp.Uploads, func(v types.MultipartUpload) bool { return *v.Key == key })
if !appendParam {
if idx >= 0 {
// Cancel any in-progress uploads for the same path
for _, upload := range resp.Uploads[idx:] {
if *upload.Key != key {
break
}
_, err := d.S3.AbortMultipartUpload(
context.Background(),
&s3.AbortMultipartUploadInput{
Bucket: ptr.String(d.Bucket),
Key: ptr.String(key),
UploadId: ptr.String(*upload.UploadId),
})
if err != nil {
return nil, fmt.Errorf("aborting s3 multipart upload %s: %w", *upload.UploadId, err)
}
}
}
inputArgs := &s3.CreateMultipartUploadInput{
Bucket: ptr.String(d.Bucket),
Key: ptr.String(key),
ContentType: d.getContentType(),
ACL: d.getACL(),
ServerSideEncryption: d.getEncryptionMode(),
SSEKMSKeyId: d.getSSEKMSKeyID(),
StorageClass: d.getStorageClass(),
}
if !d.ChecksumDisabled {
inputArgs.ChecksumAlgorithm = d.ChecksumAlgorithm
}
mpUpload, err := d.S3.CreateMultipartUpload(ctx, inputArgs)
if err != nil {
return nil, fmt.Errorf("creating new multipart upload: %w", err)
}
return d.newWriter(key, *mpUpload.UploadId, nil), nil
}
if idx == -1 {
return nil, storagedriver.PathNotFoundError{Path: path, DriverName: common.V2DriverName}
}
mpUpload := resp.Uploads[idx]
// respLoopCount is the number of response pages traversed. Each increment
// of respLoopCount signifies that (at most) 1 full page of parts was
// pushed, where one full page of parts is equivalent to 1,000 uploaded
// parts which in turn is equivalent to 10485760 bytes of data.
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListParts.html
respLoopCount := 0
allParts := make([]types.Part, 0)
listResp := &s3.ListPartsOutput{
IsTruncated: ptr.Bool(true),
}
for resp.IsTruncated != nil && *listResp.IsTruncated {
// error out if we have pushed more than 100GB of parts
if respLoopCount > maxListRespLoop {
return nil, ErrMaxListRespExceeded
}
listResp, err = d.S3.ListParts(
ctx,
&s3.ListPartsInput{
Bucket: ptr.String(d.Bucket),
Key: ptr.String(key),
UploadId: mpUpload.UploadId,
PartNumberMarker: listResp.NextPartNumberMarker,
})
if err != nil {
return nil, parseError(path, err)
}
allParts = append(allParts, listResp.Parts...)
respLoopCount++
}
return d.newWriter(key, *mpUpload.UploadId, allParts), nil
}