exporter/azureblobexporter/exporter.go (208 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package azureblobexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/azureblobexporter"
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"math/rand/v2"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/appendblob"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/pipeline"
"go.uber.org/zap"
)
type azureBlobExporter struct {
config *Config
logger *zap.Logger
client azblobClient
signal pipeline.Signal
marshaller *marshaller
}
type azblobClient interface {
UploadStream(ctx context.Context, containerName string, blobName string, body io.Reader, o *azblob.UploadStreamOptions) (azblob.UploadStreamResponse, error)
URL() string
AppendBlock(ctx context.Context, containerName string, blobName string, data []byte, o *appendblob.AppendBlockOptions) error
}
type azblobClientImpl struct {
client *azblob.Client
}
func (c *azblobClientImpl) UploadStream(ctx context.Context, containerName string, blobName string, body io.Reader, o *azblob.UploadStreamOptions) (azblob.UploadStreamResponse, error) {
return c.client.UploadStream(ctx, containerName, blobName, body, o)
}
func (c *azblobClientImpl) URL() string {
return c.client.URL()
}
func (c *azblobClientImpl) AppendBlock(ctx context.Context, containerName string, blobName string, data []byte, o *appendblob.AppendBlockOptions) error {
containerClient := c.client.ServiceClient().NewContainerClient(containerName)
appendBlobClient := containerClient.NewAppendBlobClient(blobName)
_, err := appendBlobClient.AppendBlock(ctx, newReadSeekCloserWrapper(data), o)
if err == nil {
return nil
}
// Handle BlobNotFound error by creating the blob and retrying
var cerr *azcore.ResponseError
if errors.As(err, &cerr) && cerr.ErrorCode == "BlobNotFound" {
if _, err = appendBlobClient.Create(ctx, nil); err != nil {
return fmt.Errorf("failed to create append blob: %w", err)
}
_, err = appendBlobClient.AppendBlock(ctx, newReadSeekCloserWrapper(data), o)
if err != nil {
return fmt.Errorf("failed to append block after creation: %w", err)
}
return nil
}
return fmt.Errorf("failed to append block: %w", err)
}
func newAzureBlobExporter(config *Config, logger *zap.Logger, signal pipeline.Signal) *azureBlobExporter {
return &azureBlobExporter{
config: config,
logger: logger,
signal: signal,
}
}
func randomInRange(low, hi int) int {
return low + rand.IntN(hi-low)
}
func (e *azureBlobExporter) start(_ context.Context, host component.Host) error {
var err error
// create marshaller
e.marshaller, err = newMarshaller(e.config, host)
if err != nil {
return err
}
// create client based on auth type
authType := e.config.Auth.Type
azblobClient := &azblobClientImpl{}
e.client = azblobClient
switch authType {
case ConnectionString:
azblobClient.client, err = azblob.NewClientFromConnectionString(e.config.Auth.ConnectionString, nil)
if err != nil {
return fmt.Errorf("failed to create client from connection string: %w", err)
}
case ServicePrincipal:
cred, err := azidentity.NewClientSecretCredential(
e.config.Auth.TenantID,
e.config.Auth.ClientID,
e.config.Auth.ClientSecret,
nil)
if err != nil {
return fmt.Errorf("failed to create service principal credential: %w", err)
}
azblobClient.client, err = azblob.NewClient(e.config.URL, cred, nil)
if err != nil {
return fmt.Errorf("failed to create client with service principal: %w", err)
}
case SystemManagedIdentity:
cred, err := azidentity.NewManagedIdentityCredential(nil)
if err != nil {
return fmt.Errorf("failed to create system managed identity credential: %w", err)
}
azblobClient.client, err = azblob.NewClient(e.config.URL, cred, nil)
if err != nil {
return fmt.Errorf("failed to create client with system managed identity: %w", err)
}
case UserManagedIdentity:
cred, err := azidentity.NewManagedIdentityCredential(&azidentity.ManagedIdentityCredentialOptions{
ID: azidentity.ClientID(e.config.Auth.ClientID),
})
if err != nil {
return fmt.Errorf("failed to create user managed identity credential: %w", err)
}
azblobClient.client, err = azblob.NewClient(e.config.URL, cred, nil)
if err != nil {
return fmt.Errorf("failed to create client with user managed identity: %w", err)
}
default:
return fmt.Errorf("unsupported authentication type: %s", authType)
}
return nil
}
func (e *azureBlobExporter) generateBlobName(signal pipeline.Signal) (string, error) {
// Get current time
now := time.Now()
var format string
switch signal {
case pipeline.SignalMetrics:
format = e.config.BlobNameFormat.MetricsFormat
case pipeline.SignalLogs:
format = e.config.BlobNameFormat.LogsFormat
case pipeline.SignalTraces:
format = e.config.BlobNameFormat.TracesFormat
default:
return "", fmt.Errorf("unsupported signal type: %v", signal)
}
return fmt.Sprintf("%s_%d", now.Format(format), randomInRange(0, int(e.config.BlobNameFormat.SerialNumRange))), nil
}
func (e *azureBlobExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
func (e *azureBlobExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
// Marshal the metrics data
data, err := e.marshaller.marshalMetrics(md)
if err != nil {
return fmt.Errorf("failed to marshal metrics: %w", err)
}
return e.consumeData(ctx, data, pipeline.SignalMetrics)
}
func (e *azureBlobExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
// Marshal the logs data
data, err := e.marshaller.marshalLogs(ld)
if err != nil {
return fmt.Errorf("failed to marshal logs: %w", err)
}
return e.consumeData(ctx, data, pipeline.SignalLogs)
}
func (e *azureBlobExporter) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
// Marshal the trace data
data, err := e.marshaller.marshalTraces(td)
if err != nil {
return fmt.Errorf("failed to marshal traces: %w", err)
}
return e.consumeData(ctx, data, pipeline.SignalTraces)
}
func (e *azureBlobExporter) consumeData(ctx context.Context, data []byte, signal pipeline.Signal) error {
// Generate a unique blob name
blobName, err := e.generateBlobName(signal)
if err != nil {
return fmt.Errorf("failed to generate blobname: %w", err)
}
var containerName string
switch signal {
case pipeline.SignalMetrics:
containerName = e.config.Container.Metrics
case pipeline.SignalLogs:
containerName = e.config.Container.Logs
case pipeline.SignalTraces:
containerName = e.config.Container.Traces
default:
return fmt.Errorf("unsupported signal type: %v", signal)
}
if e.config.AppendBlob != nil && e.config.AppendBlob.Enabled {
// Add separator if configured
if e.config.AppendBlob.Separator != "" {
data = append(data, []byte(e.config.AppendBlob.Separator)...)
}
err = e.client.AppendBlock(ctx, containerName, blobName, data, nil)
} else {
blobContentReader := bytes.NewReader(data)
_, err = e.client.UploadStream(ctx, containerName, blobName, blobContentReader, nil)
}
if err != nil {
return fmt.Errorf("failed to upload data: %w", err)
}
e.logger.Debug("Successfully exported data to Azure Blob Storage",
zap.String("account", e.client.URL()),
zap.String("container", containerName),
zap.String("blob", blobName),
zap.Int("size", len(data)))
return nil
}
func newReadSeekCloserWrapper(data []byte) *readSeekCloserWrapper {
return &readSeekCloserWrapper{bytes.NewReader(data)}
}
// readSeekCloserWrapper wraps a bytes.Reader to implement io.ReadSeekCloser
type readSeekCloserWrapper struct {
*bytes.Reader
}
func (r readSeekCloserWrapper) Close() error {
return nil
}