internal/output/azureblobstorage/azure_blob_storage.go (53 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more agreements.
// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.
package azureblobstorage
import (
"context"
"errors"
"fmt"
"github.com/elastic/stream/internal/output"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
blobalias "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
)
func init() {
output.Register("azureblobstorage", New)
}
type Output struct {
opts *output.Options
client *azblob.Client
}
func New(opts *output.Options) (output.Output, error) {
if opts.Addr == "" {
return nil, errors.New("azure blob storage address is required")
}
// A connection string is used for multiple reasons, its easier to bypass the URL endpoint, and the hardcoded credentials can easily be passed.
// These credentials are the defaults for the Azurite Emulator, which is why they can simply be hardcoded.
connectionString := fmt.Sprintf("DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://%s:%s/devstoreaccount1;", opts.Addr, opts.AzureBlobStorageOptions.Port)
serviceClient, _ := azblob.NewClientFromConnectionString(connectionString, nil)
return &Output{opts: opts, client: serviceClient}, nil
}
func (o *Output) DialContext(ctx context.Context) error {
if err := o.createContainer(ctx); err != nil {
return err
}
return nil
}
// Close is not needed as there is no client to close
func (*Output) Close() error {
return nil
}
func (o *Output) Write(b []byte) (int, error) {
cType := "application/json"
options := azblob.UploadBufferOptions{
HTTPHeaders: &blobalias.HTTPHeaders{
BlobContentType: &cType,
},
}
_, err := o.client.UploadBuffer(context.Background(), o.opts.AzureBlobStorageOptions.Container, o.opts.AzureBlobStorageOptions.Blob, b, &options)
if err != nil {
return 0, fmt.Errorf("failed to upload file to blob: %w", err)
}
return len(b), nil
}
func (o *Output) createContainer(ctx context.Context) error {
_, err := o.client.CreateContainer(ctx, o.opts.AzureBlobStorageOptions.Container, nil)
if err != nil {
return err
}
return nil
}