internal/output/azureeventhub/azure_event_hub.go (61 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more agreements.
// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.
package azureeventhub
import (
"context"
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/elastic/stream/internal/output"
)
func init() {
output.Register("azureeventhub", New)
}
type Output struct {
opts *output.Options
producerClient *azeventhubs.ProducerClient
cancelFunc context.CancelFunc
cancelCtx context.Context
}
func New(opts *output.Options) (output.Output, error) {
var producerClient *azeventhubs.ProducerClient
var err error
if opts.AzureEventHubOptions.ConnectionString != "" {
producerClient, err = azeventhubs.NewProducerClientFromConnectionString(opts.AzureEventHubOptions.ConnectionString, opts.AzureEventHubOptions.EventHubName, nil)
if err != nil {
return nil, fmt.Errorf("error while creating new eventhub producer client from connection string: %w", err)
}
} else {
fmt.Print("no connection string was provided, falling back to default credentials or environment variable")
// Credentials set as env variables - https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/azidentity#environment-variables
defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
return nil, fmt.Errorf("missing azure credentials in the environment variables: %w", err)
}
producerClient, err = azeventhubs.NewProducerClient(opts.AzureEventHubOptions.FullyQualifiedNamespace, opts.AzureEventHubOptions.EventHubName, defaultAzureCred, nil)
if err != nil {
return nil, fmt.Errorf("error while creating new eventhub producer client: %w", err)
}
}
ctx, cancel := context.WithCancel(context.Background())
return &Output{opts: opts, producerClient: producerClient, cancelFunc: cancel, cancelCtx: ctx}, nil
}
func (*Output) DialContext(_ context.Context) error {
return nil
}
func (o *Output) Close() error {
o.producerClient.Close(o.cancelCtx)
o.cancelFunc()
return nil
}
func (o *Output) Write(b []byte) (int, error) {
batch, err := o.producerClient.NewEventDataBatch(o.cancelCtx, nil)
if err != nil {
return 0, fmt.Errorf("error while creating new event data batch: %w", err)
}
eventData := azeventhubs.EventData{Body: b}
if err := batch.AddEventData(&eventData, nil); err != nil {
return 0, fmt.Errorf("error while adding data to event data batch: %w", err)
}
if err := o.producerClient.SendEventDataBatch(context.TODO(), batch, nil); err != nil {
return 0, fmt.Errorf("error while sending event data batch: %w", err)
}
return len(b), nil
}