func()

in registry/storage/driver/azure/v2/azure.go [413:510]


func (d *driver) Move(ctx context.Context, sourcePath, destPath string) error {
	ctxRetry := policy.WithRetryOptions(
		ctx,
		policy.RetryOptions{
			MaxRetries:    d.maxRetries,
			TryTimeout:    d.retryTryTimeout,
			RetryDelay:    d.retryDelay,
			MaxRetryDelay: d.maxRetryDelay,
		},
	)

	srcBlobRef := d.client.NewBlobClient(d.PathToKey(sourcePath))
	// NOTE(prozlach): No need to sign the src URL, as the credentials for the
	// dst blob will be used for accessing src blob when calling StartCopyFromURL()
	srcBlobURL := srcBlobRef.URL()

	dstBlobRef := d.client.NewBlobClient(d.PathToKey(destPath))
	resp, err := dstBlobRef.StartCopyFromURL(ctxRetry, srcBlobURL, nil)
	if err != nil {
		if Is404(err) {
			return storagedriver.PathNotFoundError{Path: sourcePath, DriverName: DriverName}
		}
		return err
	}

	b := backoff.NewExponentialBackOff(
		backoff.WithInitialInterval(d.poolInitialInterval),
		backoff.WithMaxInterval(d.poolMaxInterval),
		backoff.WithMaxElapsedTime(d.poolMaxElapsedTime),
	)
	ctxB := backoff.WithContext(b, ctxRetry)

	// Operation to check copy status
	operation := func() error {
		props, err := dstBlobRef.GetProperties(ctxRetry, nil)
		if err != nil {
			// NOTE(prozlach): We do not treat this as a permament error and
			// retry instead as this may be a transient error and the copy
			// operation may still be progressing on the Azure side.
			// In the worst case we will abort the whole copy operation after
			// `d.poolMaxElapsedTime` has been reached in case when this error
			// was a permament one after all due to e.g. network connectivity
			// issues, but this seems like a lesser evil than letting the copy
			// operation finish in the background and having both src file and
			// dst file in the backend.
			return fmt.Errorf("getting blob properties: %w", err)
		}

		if props.CopyStatus == nil {
			return errors.New("copy status has not been set")
		}

		switch *props.CopyStatus {
		case blob.CopyStatusTypeSuccess:
			return nil
		case blob.CopyStatusTypePending:
			return ErrCopyStatusPending
		case blob.CopyStatusTypeAborted:
			if props.CopyStatusDescription != nil {
				err = fmt.Errorf("move blob with copy id %s has been aborted: %s", *props.CopyID, *props.CopyStatusDescription)
			} else {
				err = fmt.Errorf("move blob with copy id %s has been aborted", *props.CopyID)
			}
			return backoff.Permanent(err)
		case blob.CopyStatusTypeFailed:
			if props.CopyStatusDescription != nil {
				err = fmt.Errorf("move blob with copy id %s has failed on the Azure backend: %s", *props.CopyID, *props.CopyStatusDescription)
			} else {
				err = fmt.Errorf("move blob with copy id %s has failed on the Azure backend", *props.CopyID)
			}
			return backoff.Permanent(err)
		default:
			// NOTE(prozlach): this may be a transient error, give it a benefit
			// of the doubt and retry until we have a solid signal to abort
			return fmt.Errorf("unknown copy status: %s", *props.CopyStatus)
		}
	}

	// Use backoff retry for polling
	err = backoff.Retry(operation, ctxB)
	if err != nil {
		if errors.Is(err, ErrCopyStatusPending) {
			// Blob copy has not finished yet and we can't wait any longer.
			// Abort the operation and return the error.
			if _, errAbort := dstBlobRef.AbortCopyFromURL(ctxRetry, *resp.CopyID, nil); errAbort != nil {
				return fmt.Errorf("aborting copy operation: %w, while handling move operation timeout", errAbort)
			}
			return fmt.Errorf("move blob did not finish after %s", b.GetElapsedTime())
		}
		return fmt.Errorf("move blob: %w", err)
	}

	// Blob might have been already deleted due to retry
	if _, err = srcBlobRef.Delete(ctxRetry, nil); err != nil && !Is404(err) {
		return fmt.Errorf("deleting source blob: %w", err)
	}
	return nil
}