internal/conn/storage/storage.go (192 lines of code) (raw):
/*
Package storage provides a client around Azure Blob Storage for sending data
that will be used by the ARN service.
Usage:
cli, err := storage.New("https://myaccount.blob.core.windows.net", cred)
if err != nil {
log.Fatal(err)
}
defer cli.Close()
if err := cli.Upload(ctx, "my-id", []byte("my-data")); err != nil {
log.Fatal(err)
}
Changes from the original:
- Moved to the slog package and from passing logrus via context.
- No longer use any logging from azcore.
- Made the credential cache non-blocking with background refreshes instead of stop the world refreshes
- Used the exponential backoff package for retries
- Moved to standard go options for constructors
- Removed use of "to" package for pointer creation, replaced with a simple generic function
- Wrote tests and changed structure to help with testing
- Did some re-ordering of events to avoid making unnecessary calls in certain failiure cases
*/
package storage
import (
"context"
"fmt"
"log/slog"
"net/url"
"regexp"
"testing"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
)
// Client is a client for interacting with Azure Blob Storage for pushing and pulling data
// used by the ARN service.
type Client struct {
now func() time.Time
cli *service.Client
clientOptions policy.ClientOptions
creds *credCache
contExt string
log *slog.Logger
// fakeUploader is used for testing purposes to simulate this client's response.
fakeUploader Uploader
fakeSignParams func(sigVals sas.BlobSignatureValues, cred *service.UserDelegationCredential) (encoder, error)
}
// Option is a function that sets an option on the client.
type Option func(*Client) error
// WithLogger sets the logger on the client. By default it uses slog.Default().
func WithLogger(log *slog.Logger) Option {
return func(c *Client) error {
c.log = log
return nil
}
}
// WithPolicyOptions sets the policy options for the service.Client.
// By default it uses the default policy options.
func WithPolicyOptions(opts policy.ClientOptions) Option {
return func(c *Client) error {
c.clientOptions = opts
return nil
}
}
var contRE = regexp.MustCompile(`^[a-z0-9-]{1,41}$`)
// WithContainerExt sets a name extension for a blob container. This can be useful for
// doing discovery of containers that are created by a particular client.
// Names are in the format "arm-ext-nt-YYYY-MM-DD". This will cause the client to create
// "arm-ext-nt-[ext]-YYYY-MM-DD". Note characters must be letters, numbers, or hyphens.
// Any letters will be automatically lowercased. The ext cannot be more than 41 characters.
func WithContainerExt(ext string) Option {
return func(c *Client) error {
if !contRE.MatchString(ext) {
return fmt.Errorf("container extension must be lowercase letters, numbers, or hyphens")
}
c.contExt = ext
return nil
}
}
// Uploader is an interface for testing purposes to simulate the Upload() method.
type Uploader interface {
// Upload simulates the Upload() method.
Upload(ctx context.Context, id string, b []byte) (*url.URL, error)
}
// WithFake sets a fake uploader for testing purposes. This will cause the client to use the fake
// Upload() method instead of the real one. Can only be used in testing.
func WithFake(f Uploader) Option {
return func(c *Client) error {
if !testing.Testing() {
return fmt.Errorf("storage.WithFake() can only be used in testing")
}
c.fakeUploader = f
return nil
}
}
// New creates a new storage client. endpoint is the Azure Blob Storage endpoint, cred is the
// Azure SDK TokenCredential, and opts are the policy options for the service.Client.
func New(endpoint string, cred azcore.TokenCredential, options ...Option) (*Client, error) {
client := &Client{
now: time.Now,
}
for _, o := range options {
if err := o(client); err != nil {
return nil, err
}
}
if client.log == nil {
client.log = slog.Default()
}
if client.fakeUploader != nil {
return client, nil
}
sClient, err := service.NewClient(endpoint, cred, &service.ClientOptions{ClientOptions: client.clientOptions})
if err != nil {
return nil, err
}
client.cli = sClient
// TODO: We need to check if the storage containers delete themselves after a certain period of time.
// If not fail.
client.creds, err = newCredCache(sClient, withLogger(client.log))
if err != nil {
return nil, err
}
return client, nil
}
// Close closes the client.
func (c *Client) Close() {
if c.fakeUploader != nil {
return
}
c.creds.close()
}
// Upload uploads bytes to a blob named id in today's container. It returns a SAS link enabling the blob to be read.
func (c *Client) Upload(ctx context.Context, id string, b []byte) (*url.URL, error) {
const contPrefix = "arm-ext-nt"
var cName string
if c.fakeUploader != nil {
return c.fakeUploader.Upload(ctx, id, b)
}
if c.contExt == "" {
cName = fmt.Sprintf("%s-%s", contPrefix, c.now().UTC().Format(time.DateOnly))
} else {
cName = fmt.Sprintf("%s-%s-%s", contPrefix, c.contExt, c.now().UTC().Format(time.DateOnly))
}
bName := id + ".txt"
cClient := c.cli.NewContainerClient(cName)
bClient := cClient.NewBlockBlobClient(bName)
u, err := url.Parse(bClient.URL())
if err != nil {
return nil, fmt.Errorf("URL returend by blob client is not a valid URL: %w", err)
}
args := uploadArgs{
id: id,
b: b,
cName: cName,
bName: bName,
upload: bClient,
create: cClient,
url: u,
}
return c.upload(ctx, args)
}
// uploadBuffer is an interface for uploading a buffer. Implemented by *blockblob.BlockBlobClient.
type uploadBuffer interface {
UploadBuffer(ctx context.Context, buffer []byte, o *blockblob.UploadBufferOptions) (blockblob.UploadBufferResponse, error)
}
// createContainer is an interface for creating a container.
// Implemented by *container.Client.
type createContainer interface {
Create(ctx context.Context, options *container.CreateOptions) (container.CreateResponse, error)
}
// uploadArgs is a struct for holding the arguments to the upload function.
// It is field-aligned.
type uploadArgs struct {
upload uploadBuffer
create createContainer
url *url.URL
id string
cName string
bName string
b []byte
}
func (c *Client) upload(ctx context.Context, args uploadArgs) (*url.URL, error) {
cred, err := c.creds.get(ctx)
if err != nil {
return nil, err
}
// TODO: It would be better if we check for the existence of the container
// before trying to create it. It wasn't immediately obvious how to do that.
_, err = args.upload.UploadBuffer(ctx, args.b, nil)
if err := handleUploadErr(ctx, err, args.create); err != nil {
return nil, err
}
sigVals := sas.BlobSignatureValues{
Protocol: sas.ProtocolHTTPS,
StartTime: time.Now().UTC().Add(time.Second * -10),
ExpiryTime: c.now().UTC().Add(7 * 24 * time.Hour),
Permissions: (&sas.BlobPermissions{Read: true}).String(),
ContainerName: args.cName,
BlobName: args.bName,
}
c.log.Debug(fmt.Sprintf("Uploading to blob. Container: %s, Blob: %s", args.cName, args.bName))
enc, err := c.signParams(sigVals, cred)
if err != nil {
return nil, err
}
args.url.RawQuery = enc.Encode()
return args.url, nil
}
type encoder interface {
Encode() string
}
// signParams signs the parameters for the blob and returns a query string that can be used
// to access the blob. This is here because this is particularly difficult to test and needs to be faked,
// but if it doesn't work the whole system will fail.
func (c *Client) signParams(sigVals sas.BlobSignatureValues, cred *service.UserDelegationCredential) (encoder, error) {
if c.fakeSignParams != nil {
return c.fakeSignParams(sigVals, cred)
}
params, err := sigVals.SignWithUserDelegation(cred)
if err != nil {
return nil, err
}
return ¶ms, nil
}
// handleUploadErr handles the error returned by the upload operation.
func handleUploadErr(ctx context.Context, err error, client createContainer) error {
if err == nil {
return nil
}
if bloberror.HasCode(err, bloberror.ContainerNotFound) {
_, err = client.Create(ctx, nil)
if err == nil || bloberror.HasCode(err, bloberror.ContainerAlreadyExists) {
return nil
}
return err
}
return err
}
// toPtr returns a pointer to any value. Do not replace this with the various
// old "to" packages.
func toPtr[T any](v T) *T {
return &v
}