registry/storage/driver/azure/v2/azure.go (691 lines of code) (raw):
// Package azure provides a storagedriver.StorageDriver implementation to
// store blobs in Microsoft Azure Blob Storage Service.
package v2
import (
"bufio"
"bytes"
"context"
"crypto/md5" // nolint: gosec,revive // ok for content verification
"errors"
"fmt"
"io"
"strings"
"sync/atomic"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/appendblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/cenkalti/backoff/v4"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/azure/common"
"github.com/docker/distribution/registry/storage/driver/base"
)
var ErrCorruptedData = errors.New("corrupted data found in the uploaded data")
const DriverName = "azure_v2"
const (
MaxChunkSize int64 = 4 << 20
// NOTE(prozlach): values chosen arbitrarily
DefaultPoolInitialInterval = 100 * time.Millisecond
DefaultPoolMaxInterval = 1 * time.Second
DefaultPoolMaxElapsedTime = 5 * time.Second
// Defaults match the Azure driver defaults as per
// https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/azcore@v1.17.0/policy#RetryOptions
//
DefaultMaxRetries = 3
DefaultRetryTryTimeout = 0 // disabled
DefaultRetryDelay = 4 * time.Second
DefaultMaxRetryDelay = 60 * time.Second
DefaultSignedURLExpiry = 20 * time.Minute
)
var ErrCopyStatusPending = errors.New("copy still pending")
type driver struct {
common.Pather
client *container.Client
signer urlSigner
poolInitialInterval time.Duration
poolMaxInterval time.Duration
poolMaxElapsedTime time.Duration
maxRetries int32
retryTryTimeout time.Duration
retryDelay time.Duration
maxRetryDelay time.Duration
}
type baseEmbed struct{ base.Base }
// Driver is a storagedriver.StorageDriver implementation backed by
// Microsoft Azure Blob Storage Service.
type Driver struct{ baseEmbed }
type AzureDriverFactory struct{}
func (*AzureDriverFactory) Create(parameters map[string]any) (storagedriver.StorageDriver, error) {
return FromParameters(parameters)
}
// FromParameters constructs a new Driver with a given parameters map.
func FromParameters(parameters map[string]any) (storagedriver.StorageDriver, error) {
params, err := ParseParameters(parameters)
if err != nil {
return nil, err
}
return New(params)
}
// New constructs a new Driver with the given Azure Storage Account credentials
func New(in any) (storagedriver.StorageDriver, error) {
params := in.(*DriverParameters)
switch params.CredentialsType {
case common.CredentialsTypeSharedKey:
return newSharedKeyCredentialsClient(params)
case common.CredentialsTypeClientSecret, common.CredentialsTypeDefaultCredentials:
return newTokenClient(params)
default:
return nil, fmt.Errorf("invalid credentials type: %q", params.CredentialsType)
}
}
// Implement the storagedriver.StorageDriver interface.
func (*driver) Name() string {
return DriverName
}
// GetContent retrieves the content stored at "targetPath" as a []byte.
func (d *driver) GetContent(ctx context.Context, targetPath string) ([]byte, error) {
ctxRetry := policy.WithRetryOptions(
ctx,
policy.RetryOptions{
MaxRetries: d.maxRetries,
TryTimeout: d.retryTryTimeout,
RetryDelay: d.retryDelay,
MaxRetryDelay: d.maxRetryDelay,
},
)
resp, err := d.client.NewBlobClient(d.PathToKey(targetPath)).DownloadStream(ctxRetry, nil)
if err != nil {
if Is404(err) {
return nil, storagedriver.PathNotFoundError{Path: targetPath, DriverName: DriverName}
}
return nil, err
}
defer resp.Body.Close()
return io.ReadAll(resp.Body)
}
// PutContent stores the []byte content at a location designated by "path".
func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
ctxRetry := policy.WithRetryOptions(
ctx,
policy.RetryOptions{
MaxRetries: d.maxRetries,
TryTimeout: d.retryTryTimeout,
RetryDelay: d.retryDelay,
MaxRetryDelay: d.maxRetryDelay,
},
)
// max size for block blobs uploaded via single "Put Blob" for version after "2016-05-31"
// https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob#remarks
if len(contents) > blockblob.MaxUploadBlobBytes {
return fmt.Errorf(
"uploading %d bytes with PutContent is not supported; limit: %d bytes",
len(contents),
blockblob.MaxUploadBlobBytes,
)
}
// Historically, blobs uploaded via PutContent used to be of type AppendBlob
// (https://github.com/distribution/distribution/pull/1438). We can't replace
// these blobs atomically via a single "Put Blob" operation without
// deleting them first. Once we detect they are BlockBlob type, we can
// overwrite them with an atomically "Put Blob" operation.
//
// While we delete the blob and create a new one, there will be a small
// window of inconsistency and if the Put Blob fails, we may end up with
// losing the existing data while migrating it to BlockBlob type. However,
// expectation isthe clients pushing will be retrying when they get an error
// response.
blobName := d.PathToKey(path)
blobRef := d.client.NewBlobClient(blobName)
props, err := blobRef.GetProperties(ctxRetry, nil)
if err != nil && !Is404(err) {
return fmt.Errorf("failed to get blob properties: %w", err)
}
if err == nil && props.BlobType != nil && *props.BlobType != blob.BlobTypeBlockBlob {
if _, err := blobRef.Delete(ctxRetry, nil); err != nil && !Is404(err) {
return fmt.Errorf("failed to delete legacy blob (%s): %w", *props.BlobType, err)
}
}
_, err = d.client.NewBlockBlobClient(blobName).UploadBuffer(ctxRetry, contents, nil)
if err != nil {
return fmt.Errorf("creating new block blob client: %w", err)
}
return nil
}
// Reader retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset.
func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
ctxRetry := policy.WithRetryOptions(
ctx,
policy.RetryOptions{
MaxRetries: d.maxRetries,
TryTimeout: d.retryTryTimeout,
RetryDelay: d.retryDelay,
MaxRetryDelay: d.maxRetryDelay,
},
)
blobRef := d.client.NewBlobClient(d.PathToKey(path))
options := blob.DownloadStreamOptions{
Range: blob.HTTPRange{
Offset: offset,
},
}
props, err := blobRef.GetProperties(ctxRetry, nil)
if err != nil {
if Is404(err) {
return nil, storagedriver.PathNotFoundError{Path: path, DriverName: DriverName}
}
return nil, fmt.Errorf("failed to get blob properties: %v", err)
}
if props.ContentLength == nil {
return nil, fmt.Errorf("missing ContentLength: %s", path)
}
size := *props.ContentLength
if offset >= size {
return io.NopCloser(bytes.NewReader(nil)), nil
}
resp, err := blobRef.DownloadStream(ctxRetry, &options)
if err != nil {
if Is404(err) {
return nil, storagedriver.PathNotFoundError{Path: path, DriverName: DriverName}
}
return nil, err
}
return resp.Body, nil
}
// Writer returns a FileWriter which will store the content written to it
// at the location designated by "path" after the call to Commit.
func (d *driver) Writer(ctx context.Context, path string, doAppend bool) (storagedriver.FileWriter, error) {
ctxRetry := policy.WithRetryOptions(
ctx,
policy.RetryOptions{
MaxRetries: d.maxRetries,
TryTimeout: d.retryTryTimeout,
RetryDelay: d.retryDelay,
MaxRetryDelay: d.maxRetryDelay,
},
)
blobName := d.PathToKey(path)
blobRef := d.client.NewBlobClient(blobName)
props, err := blobRef.GetProperties(ctxRetry, nil)
blobExists := true
if err != nil {
if !Is404(err) {
return nil, fmt.Errorf("getting blob properties: %w", err)
}
blobExists = false
}
eTag := props.ETag
// NOTE(prozlach): In case when there was operation timeout, Azure
// specifies that the operation might have succeeded, or not and that the
// client needs to verify the state. In our case it does not make much
// difference, because if the operation succeeded and the blob was indeed
// created, the next call will simply truncate it and this does not make
// difference for us.
//
// https://learn.microsoft.com/en-gb/rest/api/storageservices/put-blob
// https://learn.microsoft.com/en-us/rest/api/storageservices/common-rest-api-error-codes
var size int64
if blobExists {
if doAppend {
if props.ContentLength == nil {
return nil, fmt.Errorf("missing ContentLength: %s", blobName)
}
size = *props.ContentLength
} else {
if _, err := blobRef.Delete(ctxRetry, nil); err != nil && !Is404(err) {
return nil, fmt.Errorf("deleting existing blob before write: %w", err)
}
res, err := d.client.NewAppendBlobClient(blobName).Create(ctxRetry, nil)
if err != nil {
return nil, fmt.Errorf("creating new append blob: %w", err)
}
eTag = res.ETag
}
} else {
if doAppend {
return nil, storagedriver.PathNotFoundError{Path: path, DriverName: DriverName}
}
res, err := d.client.NewAppendBlobClient(blobName).Create(ctxRetry, nil)
if err != nil {
return nil, fmt.Errorf("creating new append blob: %w", err)
}
eTag = res.ETag
}
return d.newWriter(ctxRetry, blobName, size, eTag), nil
}
// Stat retrieves the FileInfo for the given path, including the current size
// in bytes and the creation time.
func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
ctxRetry := policy.WithRetryOptions(
ctx,
policy.RetryOptions{
MaxRetries: d.maxRetries,
TryTimeout: d.retryTryTimeout,
RetryDelay: d.retryDelay,
MaxRetryDelay: d.maxRetryDelay,
},
)
// If we try to get "/" as a blob, pathToKey will return "" when no root
// directory is specified and we are not in legacy path mode, which causes
// Azure to return a **400** when that object doesn't exist. So we need to
// skip to trying to list the blobs under "/", which should result in zero
// blobs, so we can return the expected 404 if we don't find any.
if path != "/" {
blobName := d.PathToKey(path)
blobRef := d.client.NewBlobClient(blobName)
// Check if the path is a blob
props, err := blobRef.GetProperties(ctxRetry, nil)
if err == nil {
var missing []string
if props.ContentLength == nil {
missing = append(missing, "ContentLength")
}
if props.LastModified == nil {
missing = append(missing, "LastModified")
}
if len(missing) > 0 {
return nil, fmt.Errorf("missing required properties (%s) for blob %q", strings.Join(missing, ","), blobName)
}
return storagedriver.FileInfoInternal{
FileInfoFields: storagedriver.FileInfoFields{
Path: path,
Size: *props.ContentLength,
ModTime: *props.LastModified,
IsDir: false,
},
}, nil
}
if !Is404(err) {
return nil, fmt.Errorf("fetching blob %q properties: %w", blobName, err)
}
// There is no such blob, let's see if this is a virtual-container.
}
pager := d.client.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{
Prefix: to.Ptr(d.PathToDirKey(path)),
MaxResults: to.Ptr((int32)(1)),
})
for pager.More() {
resp, err := pager.NextPage(ctxRetry)
if err != nil {
return nil, fmt.Errorf("next page when listing blobs: %w", err)
}
if len(resp.Segment.BlobItems) > 0 {
// path is a virtual container
return storagedriver.FileInfoInternal{
FileInfoFields: storagedriver.FileInfoFields{
Path: path,
IsDir: true,
},
}, nil
}
}
// path is not a blob or virtual container
return nil, storagedriver.PathNotFoundError{Path: path, DriverName: DriverName}
}
// List returns a list of objects that are direct descendants of the given path.
func (d *driver) List(ctx context.Context, path string) ([]string, error) {
ctxRetry := policy.WithRetryOptions(
ctx,
policy.RetryOptions{
MaxRetries: d.maxRetries,
TryTimeout: d.retryTryTimeout,
RetryDelay: d.retryDelay,
MaxRetryDelay: d.maxRetryDelay,
},
)
prefix := d.PathToDirKey(path)
// If we aren't using a particular root directory, we should not add the extra
// ending slash that pathToDirKey adds.
if !d.HasRootDirectory() && path == "/" {
prefix = d.PathToKey(path)
}
list, err := d.listImpl(ctxRetry, prefix)
if err != nil {
return nil, err
}
if path != "/" && len(list) == 0 {
return nil, storagedriver.PathNotFoundError{Path: path, DriverName: DriverName}
}
return list, nil
}
// Move moves an object stored at sourcePath to destPath, removing the original
// object.
// NOTE(prozlach): Azure SDK allows for synchronous copy of files up to 256MiB
// in size, bigger blobs need to be copied asynchronously. In order to keep
// things (esp. testing) simple, we use asynchronous copying for all blob
// sizes.
func (d *driver) Move(ctx context.Context, sourcePath, destPath string) error {
ctxRetry := policy.WithRetryOptions(
ctx,
policy.RetryOptions{
MaxRetries: d.maxRetries,
TryTimeout: d.retryTryTimeout,
RetryDelay: d.retryDelay,
MaxRetryDelay: d.maxRetryDelay,
},
)
srcBlobRef := d.client.NewBlobClient(d.PathToKey(sourcePath))
// NOTE(prozlach): No need to sign the src URL, as the credentials for the
// dst blob will be used for accessing src blob when calling StartCopyFromURL()
srcBlobURL := srcBlobRef.URL()
dstBlobRef := d.client.NewBlobClient(d.PathToKey(destPath))
resp, err := dstBlobRef.StartCopyFromURL(ctxRetry, srcBlobURL, nil)
if err != nil {
if Is404(err) {
return storagedriver.PathNotFoundError{Path: sourcePath, DriverName: DriverName}
}
return err
}
b := backoff.NewExponentialBackOff(
backoff.WithInitialInterval(d.poolInitialInterval),
backoff.WithMaxInterval(d.poolMaxInterval),
backoff.WithMaxElapsedTime(d.poolMaxElapsedTime),
)
ctxB := backoff.WithContext(b, ctxRetry)
// Operation to check copy status
operation := func() error {
props, err := dstBlobRef.GetProperties(ctxRetry, nil)
if err != nil {
// NOTE(prozlach): We do not treat this as a permament error and
// retry instead as this may be a transient error and the copy
// operation may still be progressing on the Azure side.
// In the worst case we will abort the whole copy operation after
// `d.poolMaxElapsedTime` has been reached in case when this error
// was a permament one after all due to e.g. network connectivity
// issues, but this seems like a lesser evil than letting the copy
// operation finish in the background and having both src file and
// dst file in the backend.
return fmt.Errorf("getting blob properties: %w", err)
}
if props.CopyStatus == nil {
return errors.New("copy status has not been set")
}
switch *props.CopyStatus {
case blob.CopyStatusTypeSuccess:
return nil
case blob.CopyStatusTypePending:
return ErrCopyStatusPending
case blob.CopyStatusTypeAborted:
if props.CopyStatusDescription != nil {
err = fmt.Errorf("move blob with copy id %s has been aborted: %s", *props.CopyID, *props.CopyStatusDescription)
} else {
err = fmt.Errorf("move blob with copy id %s has been aborted", *props.CopyID)
}
return backoff.Permanent(err)
case blob.CopyStatusTypeFailed:
if props.CopyStatusDescription != nil {
err = fmt.Errorf("move blob with copy id %s has failed on the Azure backend: %s", *props.CopyID, *props.CopyStatusDescription)
} else {
err = fmt.Errorf("move blob with copy id %s has failed on the Azure backend", *props.CopyID)
}
return backoff.Permanent(err)
default:
// NOTE(prozlach): this may be a transient error, give it a benefit
// of the doubt and retry until we have a solid signal to abort
return fmt.Errorf("unknown copy status: %s", *props.CopyStatus)
}
}
// Use backoff retry for polling
err = backoff.Retry(operation, ctxB)
if err != nil {
if errors.Is(err, ErrCopyStatusPending) {
// Blob copy has not finished yet and we can't wait any longer.
// Abort the operation and return the error.
if _, errAbort := dstBlobRef.AbortCopyFromURL(ctxRetry, *resp.CopyID, nil); errAbort != nil {
return fmt.Errorf("aborting copy operation: %w, while handling move operation timeout", errAbort)
}
return fmt.Errorf("move blob did not finish after %s", b.GetElapsedTime())
}
return fmt.Errorf("move blob: %w", err)
}
// Blob might have been already deleted due to retry
if _, err = srcBlobRef.Delete(ctxRetry, nil); err != nil && !Is404(err) {
return fmt.Errorf("deleting source blob: %w", err)
}
return nil
}
// Delete recursively deletes all objects stored at "path" and its subpaths.
func (d *driver) Delete(ctx context.Context, path string) error {
ctxRetry := policy.WithRetryOptions(
ctx,
policy.RetryOptions{
MaxRetries: d.maxRetries,
TryTimeout: d.retryTryTimeout,
RetryDelay: d.retryDelay,
MaxRetryDelay: d.maxRetryDelay,
},
)
blobRef := d.client.NewBlobClient(d.PathToKey(path))
_, err := blobRef.Delete(ctxRetry, nil)
if err == nil {
// was a blob and deleted, return
return nil
}
if !Is404(err) {
return fmt.Errorf("deleting blob %s: %w", path, err)
}
// Not a blob, see if path is a virtual container with blobs
blobs, err := d.listBlobs(ctxRetry, d.PathToDirKey(path))
if err != nil {
return fmt.Errorf("listing blobs in virtual container before deletion: %w", err)
}
if len(blobs) == 0 {
return storagedriver.PathNotFoundError{Path: path, DriverName: DriverName}
}
for _, b := range blobs {
blobRef = d.client.NewBlobClient(d.PathToKey(b))
// Blob might have been already deleted due to retry
if _, err = blobRef.Delete(ctxRetry, nil); err != nil && !Is404(err) {
return fmt.Errorf("deleting blob %s: %w", b, err)
}
}
return nil
}
// DeleteFiles deletes a set of files by iterating over their full path list
// and invoking Delete for each. Returns the number of successfully deleted
// files and any errors. This method is idempotent, no error is returned if a
// file does not exist.
func (d *driver) DeleteFiles(ctx context.Context, paths []string) (int, error) {
count := 0
for _, path := range paths {
if err := d.Delete(ctx, path); err != nil {
if !errors.As(err, new(storagedriver.PathNotFoundError)) {
return count, err
}
}
count++
}
return count, nil
}
// URLFor returns a publicly accessible URL for the blob stored at given path
// for specified duration by making use of Azure Storage Shared Access Signatures (SAS).
// See https://msdn.microsoft.com/en-us/library/azure/ee395415.aspx for more info.
func (d *driver) URLFor(ctx context.Context, path string, options map[string]any) (string, error) {
ctxRetry := policy.WithRetryOptions(
ctx,
policy.RetryOptions{
MaxRetries: d.maxRetries,
TryTimeout: d.retryTryTimeout,
RetryDelay: d.retryDelay,
MaxRetryDelay: d.maxRetryDelay,
},
)
expiresTime := common.SystemClock.Now().UTC().Add(DefaultSignedURLExpiry)
expires, ok := options["expiry"]
if ok {
t, ok := expires.(time.Time)
if ok {
expiresTime = t.UTC()
}
}
blobRef := d.client.NewBlobClient(d.PathToKey(path))
return d.signer.SignBlobURL(ctxRetry, blobRef.URL(), expiresTime)
}
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFallback(ctx, d, path, f)
}
// WalkParallel traverses a filesystem defined within driver in parallel, starting
// from the given path, calling f on each file.
func (d *driver) WalkParallel(ctx context.Context, path string, f storagedriver.WalkFn) error {
// NOTE(prozlach): WalkParallel will go away at some point, see
// https://gitlab.com/gitlab-org/container-registry/-/issues/1182#note_2258251909
// for more context.
return d.Walk(ctx, path, f)
}
// listImpl simulates a filesystem style listImpl in which both files (blobs) and
// directories (virtual containers) are returned for a given prefix.
func (d *driver) listImpl(ctx context.Context, prefix string) ([]string, error) {
return d.listWithDelimiter(ctx, prefix, "/")
}
// listBlobs lists all blobs whose names begin with the specified prefix.
func (d *driver) listBlobs(ctx context.Context, prefix string) ([]string, error) {
return d.listWithDelimiter(ctx, prefix, "")
}
func (d *driver) listWithDelimiter(ctx context.Context, prefix, delimiter string) ([]string, error) {
out := make([]string, 0)
pager := d.client.NewListBlobsHierarchyPager(
delimiter, &container.ListBlobsHierarchyOptions{
Prefix: &prefix,
MaxResults: to.Ptr((int32)(common.ListMax)),
})
for pager.More() {
page, err := pager.NextPage(ctx)
if err != nil {
return nil, fmt.Errorf("next page when listing blobs: %w", err)
}
for _, blobItem := range page.Segment.BlobItems {
if blobItem.Name == nil {
return nil, fmt.Errorf("missing blob Name when listing prefix: %s", prefix)
}
out = append(out, d.KeyToPath(*blobItem.Name))
}
for _, blobPrefix := range page.Segment.BlobPrefixes {
if blobPrefix.Name == nil {
return nil, fmt.Errorf("missing blob prefix Name when listing prefix: %s", prefix)
}
out = append(out, d.KeyToPath(*blobPrefix.Name))
}
}
return out, nil
}
func Is404(err error) bool {
return bloberror.HasCode(
err,
bloberror.BlobNotFound,
bloberror.ContainerNotFound,
bloberror.ResourceNotFound,
// Azure will return CannotVerifyCopySource with a 404 status code from
// a call to Move when the source blob does not exist. Details:
//
// https://learn.microsoft.com/en-us/rest/api/storageservices/blob-service-error-codes.
bloberror.CannotVerifyCopySource,
)
}
type writer struct {
driver *driver
path string
size *atomic.Int64
ctx context.Context
bw *bufio.Writer
closed bool
committed bool
canceled bool
}
func (d *driver) newWriter(ctx context.Context, path string, size int64, eTag *azcore.ETag) storagedriver.FileWriter {
bw := &blockWriter{
maxRetries: d.maxRetries,
client: d.client,
ctx: ctx,
path: path,
appenPos: new(atomic.Int64),
eTag: eTag,
}
bw.appenPos.Store(size)
res := &writer{
driver: d,
path: path,
size: new(atomic.Int64),
ctx: ctx,
bw: bufio.NewWriterSize(bw, int(MaxChunkSize)),
}
res.size.Store(size)
return res
}
func (w *writer) Write(p []byte) (int, error) {
switch {
case w.closed:
return 0, storagedriver.ErrAlreadyClosed
case w.committed:
return 0, storagedriver.ErrAlreadyCommited
case w.canceled:
return 0, storagedriver.ErrAlreadyCanceled
}
n, err := w.bw.Write(p)
w.size.Add(int64(n)) // nolint: gosec // Write will always return non-negative number of bytes written
return n, err
}
func (w *writer) Size() int64 {
return w.size.Load()
}
func (w *writer) Close() error {
if w.closed {
return storagedriver.ErrAlreadyClosed
}
w.closed = true
if w.canceled {
// NOTE(prozlach): If the writer has already been canceled, then there
// is nothing to flush to the backend as the target file has already
// been deleted.
return nil
}
err := w.bw.Flush()
if err != nil {
return fmt.Errorf("flushing while closing writer: %w", err)
}
return nil
}
func (w *writer) Cancel() error {
if w.closed {
return storagedriver.ErrAlreadyClosed
} else if w.committed {
return storagedriver.ErrAlreadyCommited
}
w.canceled = true
blobRef := w.driver.client.NewBlobClient(w.path)
_, err := blobRef.Delete(w.ctx, nil)
if err != nil {
if Is404(err) {
return nil
}
return fmt.Errorf("removing canceled blob: %w", err)
}
return nil
}
func (w *writer) Commit() error {
switch {
case w.closed:
return storagedriver.ErrAlreadyClosed
case w.committed:
return storagedriver.ErrAlreadyCommited
case w.canceled:
return storagedriver.ErrAlreadyCanceled
}
w.committed = true
err := w.bw.Flush()
if err != nil {
return fmt.Errorf("flushing while committing writer: %w", err)
}
return nil
}
type blockWriter struct {
client *container.Client
path string
ctx context.Context
maxRetries int32
appenPos *atomic.Int64
eTag *azcore.ETag
}
func (bw *blockWriter) Write(p []byte) (int, error) {
appendBlobRef := bw.client.NewAppendBlobClient(bw.path)
var n int64
offsetRetryCount := int32(0)
for n < int64(len(p)) {
appendPos := bw.appenPos.Load()
chunkSize := min(MaxChunkSize, int64(len(p))-n)
timeoutFromCtx := false
ctxTimeoutNotify := withTimeoutNotification(bw.ctx, &timeoutFromCtx)
if offsetRetryCount >= bw.maxRetries {
return int(n), fmt.Errorf("max number of retries (%d) reached while handling backend operation timeout", bw.maxRetries) // nolint: gosec // n is always going to be non-negative
}
resp, err := appendBlobRef.AppendBlock(
ctxTimeoutNotify,
streaming.NopCloser(bytes.NewReader(p[n:n+chunkSize])),
&appendblob.AppendBlockOptions{
AppendPositionAccessConditions: &appendblob.AppendPositionAccessConditions{
AppendPosition: to.Ptr(appendPos),
},
AccessConditions: &blob.AccessConditions{
ModifiedAccessConditions: &blob.ModifiedAccessConditions{
IfMatch: bw.eTag,
},
},
},
)
if err == nil {
n += chunkSize // number of bytes uploaded in this call to Write()
bw.eTag = resp.ETag
bw.appenPos.Add(chunkSize) // total size of the blob in the backend
continue
}
// NOTE(prozlach): These conditions could have triggered because
// either:
// * the operation timed out, but the data was actually appended
// * the operation timed out, but some other process managed to append
// the data
// In both cases Azure returns bloberror.ConditionNotMet - i.e. the
// eTag condition failure takes precedence. The bw.chunkUploadVerify
// verifies the actuall data in the backend though, so we may put both
// cases in the single bucket - in case where MD5 of the data is
// correct, we update the eTag and carry on, in case it is not, then we
// return error as some other process appended the data instead.
// Worth noting here though is that eTag gives us additional protection
// here - i.e. in cases where blob size does not change during the
// overwrite by some another process, so these conditions are complete
// each other/are orthogonal.
appendposFailed := bloberror.HasCode(err, bloberror.AppendPositionConditionNotMet)
etagFailed := bloberror.HasCode(err, bloberror.ConditionNotMet)
if !(appendposFailed || etagFailed) || !timeoutFromCtx {
// Error was not caused by an operation timeout, abort!
return int(n), fmt.Errorf("appending blob: %w", err) // nolint: gosec // n is always going to be non-negative
}
correctlyUploadedBytes, msg, newEtag, err := bw.chunkUploadVerify(appendPos, p[n:n+chunkSize])
if err != nil {
return int(n), fmt.Errorf("%s while handling operation timeout during append of data to blob: %w", msg, err) // nolint: gosec // n is always going to be non-negative
}
bw.eTag = newEtag
if correctlyUploadedBytes == 0 {
offsetRetryCount++
continue
}
offsetRetryCount = 0
// MD5 is correct, data was uploaded. Let's bump the counters and
// continue with the upload
n += correctlyUploadedBytes // number of bytes uploaded in this call to Write()
bw.appenPos.Add(correctlyUploadedBytes) // total size of the blob in the backend
}
return int(n), nil // nolint: gosec // n is always going to be non-negative
}
func (bw *blockWriter) chunkUploadVerify(appendPos int64, chunk []byte) (int64, string, *azcore.ETag, error) {
// NOTE(prozlach): We need to see if the chunk uploaded or not. As per
// the documentation, the operation __might__ have succeeded. There are
// three options:
// * chunk did not upload, the file size will be the same as bw.size.
// In this case we simply need to re-upload the last chunk
// * chunk or part of it was uploaded - we need to verify the contents
// of what has been uploaded with MD5 hash and either:
// * MD5 is ok - let's continue uploading data starting from the next
// chunk
// * MD5 is not OK - we have garbadge at the end of the file and
// AppendBlock supports only appending, we need to abort and return
// permament error to the caller.
blobRef := bw.client.NewBlobClient(bw.path)
props, err := blobRef.GetProperties(bw.ctx, nil)
if err != nil {
return 0, "determining the end of the blob", nil, err
}
if props.ContentLength == nil {
return 0, "ContentLength in blob properties is missing in reply", nil, err
}
uploadedBytes := *props.ContentLength - appendPos
if uploadedBytes == 0 {
// NOTE(prozlach): This should never happen really and is here only as
// a precaution in case something changes in the future. The idea is
// that if the HTTP call did not succed and nothing was uploaded, then
// this code path is not going to be triggered as there will be no
// AppendPos condition violation during the retry. OTOH, if the write
// succeeded even partially, then the reuploadedBytes will be greater
// than zero.
return 0, "", props.ETag, nil
}
if uploadedBytes > int64(len(chunk)) {
// NOTE(prozlach): If this happens, it means that there is more than
// one entity uploading data
return 0, "determining the end of the blob", nil, errors.New("uploaded more data than the chunk size")
}
response, err := blobRef.DownloadStream(
bw.ctx,
&blob.DownloadStreamOptions{
Range: blob.HTTPRange{Offset: appendPos, Count: uploadedBytes},
RangeGetContentMD5: to.Ptr(true), // we always upload <= 4MiB (i.e the maxChunkSize), so we can try to offload the MD5 calculation to azure
},
)
if err != nil {
return 0, "determining the MD5 of the upload blob chunk", nil, err
}
var uploadedMD5 []byte
// If upstream makes this extra check, then let's be paranoid too.
if len(response.ContentMD5) > 0 {
uploadedMD5 = response.ContentMD5
} else {
// compute md5
body := response.NewRetryReader(bw.ctx, &blob.RetryReaderOptions{MaxRetries: bw.maxRetries})
defer body.Close()
h := md5.New() // nolint: gosec // ok for content verification
_, err = io.Copy(h, body)
if err != nil {
return 0, "calculating the MD5 of the uploaded blob chunk", nil, err
}
uploadedMD5 = h.Sum(nil)
}
h := md5.New() // nolint: gosec // ok for content verification
if _, err = io.Copy(h, bytes.NewReader(chunk[:uploadedBytes])); err != nil {
return 0, "calculating the MD5 of the local blob chunk", nil, err
}
localMD5 := h.Sum(nil)
if !bytes.Equal(uploadedMD5, localMD5) {
return 0, "verifying contents of the uploaded blob chunk", nil, ErrCorruptedData
}
return uploadedBytes, "", response.ETag, nil
}