x-pack/filebeat/input/azureeventhub/v2_input.go (360 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. //go:build !aix package azureeventhub import ( "context" "errors" "fmt" "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore/cloud" "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/acker" "github.com/elastic/beats/v7/libbeat/common/backoff" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" ) const ( // startPositionEarliest lets the processor start from the earliest // available event from the event hub retention period. startPositionEarliest = "earliest" // startPositionEarliest lets the processor start from the latest // available event from the event hub retention period. startPositionLatest = "latest" // processorRestartBackoff is the initial backoff time before // restarting the processor. processorRestartBackoff = 10 * time.Second // processorRestartMaxBackoff is the maximum backoff time before // restarting the processor. processorRestartMaxBackoff = 120 * time.Second ) // azureInputConfig the Azure Event Hub input v2, // that uses the modern Azure Event Hub SDK for Go. type eventHubInputV2 struct { config azureInputConfig log *logp.Logger metrics *inputMetrics checkpointStore *checkpoints.BlobStore consumerClient *azeventhubs.ConsumerClient pipeline beat.Pipeline messageDecoder messageDecoder migrationAssistant *migrationAssistant } // newEventHubInputV2 creates a new instance of the Azure Event Hub input v2, // that uses the modern Azure Event Hub SDK for Go. func newEventHubInputV2(config azureInputConfig, log *logp.Logger) (v2.Input, error) { return &eventHubInputV2{ config: config, log: log.Named(inputName), }, nil } func (in *eventHubInputV2) Name() string { return inputName } func (in *eventHubInputV2) Test(v2.TestContext) error { return nil } // Run starts the Azure Event Hub input v2. func (in *eventHubInputV2) Run( inputContext v2.Context, pipeline beat.Pipeline, ) error { var err error ctx := v2.GoContextFromCanceler(inputContext.Cancelation) // Setup input metrics inputMetrics := newInputMetrics(inputContext.ID, nil) defer inputMetrics.Close() in.metrics = inputMetrics // Initialize the components needed to process events. err = in.setup(ctx) if err != nil { return err } defer in.teardown(ctx) // Store a reference to the pipeline, so we // can create a new pipeline client for each // partition. in.pipeline = pipeline // Start the main run loop in.run(ctx) return nil } // setup initializes the components needed to process events. func (in *eventHubInputV2) setup(ctx context.Context) error { sanitizers, err := newSanitizers(in.config.Sanitizers, in.config.LegacySanitizeOptions) if err != nil { return fmt.Errorf("failed to create sanitizers: %w", err) } // Decode the messages from event hub into // a `[]string`. in.messageDecoder = messageDecoder{ config: in.config, log: in.log, metrics: in.metrics, sanitizers: sanitizers, } containerClient, err := container.NewClientFromConnectionString( in.config.SAConnectionString, in.config.SAContainer, &container.ClientOptions{ ClientOptions: azcore.ClientOptions{ // FIXME: check more pipelineClient creation options. Cloud: cloud.AzurePublic, }, }, ) if err != nil { return fmt.Errorf("failed to create blob container client: %w", err) } // The modern event hub SDK does not create the container // automatically like the old SDK. // // The new `BlobStore` explicitly says: // "the container must exist before the checkpoint store can be used." // // We need to ensure it exists before we can use it. err = in.ensureContainerExists(ctx, containerClient) if err != nil { return fmt.Errorf("failed to ensure blob container exists: %w", err) } // Create the checkpoint store. // // The processor uses the checkpoint store to persist // checkpoint information in the container using blobs (one blob // for each event hub partition). checkpointStore, err := checkpoints.NewBlobStore(containerClient, nil) if err != nil { return fmt.Errorf("failed to create checkpoint store: %w", err) } in.checkpointStore = checkpointStore // Create the event hub consumerClient to receive events. consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString( in.config.ConnectionString, in.config.EventHubName, in.config.ConsumerGroup, nil, ) if err != nil { return fmt.Errorf("failed to create consumer client: %w", err) } in.consumerClient = consumerClient // Manage the migration of the checkpoint information // from the old Event Hub SDK to the new Event Hub SDK. in.migrationAssistant = newMigrationAssistant( in.log, consumerClient, containerClient, checkpointStore, ) return nil } // teardown releases the resources used by the input. func (in *eventHubInputV2) teardown(ctx context.Context) { if in.consumerClient == nil { return } err := in.consumerClient.Close(ctx) if err != nil { in.log.Errorw( "error closing consumer client", "error", err, ) } } // run starts the main loop for processing events. func (in *eventHubInputV2) run(ctx context.Context) { if in.config.MigrateCheckpoint { in.log.Infow("checkpoint migration is enabled") // Check if we need to migrate the checkpoint store. err := in.migrationAssistant.checkAndMigrate( ctx, in.config.ConnectionString, in.config.EventHubName, in.config.ConsumerGroup, ) if err != nil { in.log.Errorw("error migrating checkpoint store", "error", err) // FIXME: should we return here? } } // Handle the case when the processor stops due to // transient errors (network failures) and we need to // restart it. processorRunBackoff := backoff.NewEqualJitterBackoff( ctx.Done(), processorRestartBackoff, // initial backoff processorRestartMaxBackoff, // max backoff ) // Create the processor options using the input // configuration. processorOptions := createProcessorOptions(in.config) for ctx.Err() == nil { // Create a new processor for each run. // // The docs explicitly say that the processor // is not reusable. processor, err := azeventhubs.NewProcessor( in.consumerClient, in.checkpointStore, processorOptions, ) if err != nil { in.log.Errorw("error creating processor", "error", err) return } // Launch one goroutines for each partition // to process events. go in.workersLoop(ctx, processor) // Run the processor to start processing events. // // This is a blocking call. // // It will return when the processor stops due to: // - an error // - when the context is cancelled. // // On cancellation, it will return a nil error. if err := processor.Run(ctx); err != nil { in.log.Errorw("processor exited with a non-nil error", "error", err) in.log.Infow("waiting before retrying starting the processor") // `Run()` returns an error when the processor thinks it's // unrecoverable. // // We wait before retrying to start the processor. processorRunBackoff.Wait() // Update input metrics. in.metrics.processorRestarts.Inc() } in.log.Infow( "run completed; restarting the processor if context error is nil", "context_error", ctx.Err(), ) } } // createProcessorOptions creates the processor options using the input configuration. func createProcessorOptions(config azureInputConfig) *azeventhubs.ProcessorOptions { // Start position offers multiple options: // // - Offset // - SequenceNumber // - EnqueuedTime // // As of now, we only support Earliest and Latest. // // The processor uses the default start position for // all partitions if there is no checkpoint information // available from the storage account container. defaultStartPosition := azeventhubs.StartPosition{} switch config.ProcessorStartPosition { case startPositionEarliest: defaultStartPosition.Earliest = to.Ptr(true) case startPositionLatest: defaultStartPosition.Latest = to.Ptr(true) } return &azeventhubs.ProcessorOptions{ // // The `LoadBalancingStrategy` controls how the // processor distributes the partitions across the // consumers. // // LoadBalancingStrategy offers multiple options: // // - Balanced // - Greedy // // As of now, we only support the "balanced" load // balancing strategy for retro compatibility with // the old SDK. // LoadBalancingStrategy: azeventhubs.ProcessorStrategyBalanced, UpdateInterval: config.ProcessorUpdateInterval, StartPositions: azeventhubs.StartPositions{ Default: defaultStartPosition, }, } } // ensureContainerExists ensures the blob container exists. func (in *eventHubInputV2) ensureContainerExists(ctx context.Context, blobContainerClient *container.Client) error { exists, err := in.containerExists(ctx, blobContainerClient) if err != nil { return fmt.Errorf("failed to check if blob container exists: %w", err) } if exists { return nil } // Since the container does not exist, we create it. r, err := blobContainerClient.Create(ctx, nil) if err != nil { // If the container already exists, we ignore the error. var responseError *azcore.ResponseError if !errors.As(err, &responseError) || responseError.ErrorCode != string(bloberror.ContainerAlreadyExists) { return fmt.Errorf("failed to create blob container: %w", err) } // The container already exists, we can ignore the error. // // This is a possible scenario when another process created // the container after we checked if it exists. in.log.Debugw( "blob container already exists, no need to create a new one", "container", in.config.SAContainer, ) } in.log.Infow("blob container created successfully", "response", r) return nil } // containerExists checks if the blob container exists. func (in *eventHubInputV2) containerExists(ctx context.Context, blobContainerClient *container.Client) (bool, error) { // Try to access the container to see if it exists. _, err := blobContainerClient.GetProperties(ctx, &container.GetPropertiesOptions{}) if err == nil { in.log.Debugw("blob container already exists, no need to create a new one", "container", in.config.SAContainer) return true, nil } var responseError *azcore.ResponseError if errors.As(err, &responseError) && responseError.ErrorCode == string(bloberror.ContainerNotFound) { return false, nil } return false, fmt.Errorf("failed to check if blob container exists: %w", err) } // workersLoop starts a goroutine for each partition to process events. func (in *eventHubInputV2) workersLoop(ctx context.Context, processor *azeventhubs.Processor) { for { // The call blocks until an owned partition is available or the // context is cancelled. processorPartitionClient := processor.NextPartitionClient(ctx) if processorPartitionClient == nil { // We break out from the for loop when `NextPartitionClient` // return `nil` (signals the processor has stopped). break } partitionID := processorPartitionClient.PartitionID() // Start a goroutine to process events for the partition. go func() { in.log.Infow( "starting a partition worker", "partition_id", partitionID, ) if err := in.processEventsForPartition(ctx, processorPartitionClient); err != nil { // It seems we always get an error, // even when the processor is stopped. in.log.Infow( "stopping processing events for partition", "reason", err, "partition_id", partitionID, ) } in.log.Infow( "partition worker exited", "partition_id", partitionID, ) }() } } // processEventsForPartition receives events from a partition and processes them. func (in *eventHubInputV2) processEventsForPartition(ctx context.Context, partitionClient *azeventhubs.ProcessorPartitionClient) error { // 1. [BEGIN] Initialize any partition specific resources for your application. // 2. [CONTINUOUS] Loop, calling ReceiveEvents() and UpdateCheckpoint(). // 3. [END] Cleanup any resources. partitionID := partitionClient.PartitionID() // 1/3 [BEGIN] Initialize any partition specific resources for your application. pipelineClient, err := initializePartitionResources(ctx, partitionClient, in.pipeline, in.log) if err != nil { return err } defer func() { // 3/3 [END] Do cleanup here, like shutting down database clients // or other resources used for processing this partition. shutdownPartitionResources(ctx, partitionClient, pipelineClient) in.log.Debugw( "partition resources cleaned up", "partition_id", partitionID, ) }() // 2/3 [CONTINUOUS] Receive events, checkpointing as needed using UpdateCheckpoint. for { // Wait up to `in.config.PartitionReceiveTimeout` for `in.config.PartitionReceiveCount` events, // otherwise returns whatever we collected during that time. receiveCtx, cancelReceive := context.WithTimeout(ctx, in.config.PartitionReceiveTimeout) events, err := partitionClient.ReceiveEvents(receiveCtx, in.config.PartitionReceiveCount, nil) cancelReceive() if err != nil && !errors.Is(err, context.DeadlineExceeded) { var eventHubError *azeventhubs.Error if errors.As(err, &eventHubError) && eventHubError.Code == azeventhubs.ErrorCodeOwnershipLost { in.log.Infow( "ownership lost for partition, stopping processing", "partition_id", partitionID, ) return nil } return err } if len(events) == 0 { in.log.Debugw( "no events received", "partition_id", partitionID, ) continue } err = in.processReceivedEvents(events, partitionID, pipelineClient) if err != nil { return fmt.Errorf("error processing received events: %w", err) } } } // processReceivedEvents func (in *eventHubInputV2) processReceivedEvents(receivedEvents []*azeventhubs.ReceivedEventData, partitionID string, pipelineClient beat.Client) error { processingStartTime := time.Now() for _, receivedEventData := range receivedEvents { eventHubMetadata := mapstr.M{ "partition_id": partitionID, "eventhub": in.config.EventHubName, "consumer_group": in.config.ConsumerGroup, } // Update input metrics. in.metrics.receivedMessages.Inc() in.metrics.receivedBytes.Add(uint64(len(receivedEventData.Body))) _, _ = eventHubMetadata.Put("offset", receivedEventData.Offset) _, _ = eventHubMetadata.Put("sequence_number", receivedEventData.SequenceNumber) _, _ = eventHubMetadata.Put("enqueued_time", receivedEventData.EnqueuedTime) // The partition key is optional. if receivedEventData.PartitionKey != nil { _, _ = eventHubMetadata.Put("partition_key", *receivedEventData.PartitionKey) } // A single event can contain multiple records. // We create a new event for each record. records := in.messageDecoder.Decode(receivedEventData.Body) for _, record := range records { event := beat.Event{ // this is the default value for the @timestamp field; usually the ingest // pipeline replaces it with a value in the payload. Timestamp: processingStartTime, Fields: mapstr.M{ "message": record, "azure": eventHubMetadata, }, Private: receivedEventData, } // Publish the event to the Beats pipeline. pipelineClient.Publish(event) // Update input metrics. in.metrics.sentEvents.Inc() } // Update input metrics. in.metrics.processedMessages.Inc() in.metrics.processingTime.Update(time.Since(processingStartTime).Nanoseconds()) } return nil } // initializePartitionResources initializes any partition specific resources for your application. // // Sets up a pipelineClient for publishing events and receive notification of their ACKs. func initializePartitionResources(ctx context.Context, partitionClient *azeventhubs.ProcessorPartitionClient, pipeline beat.Pipeline, log *logp.Logger) (beat.Client, error) { // initialize things that might be partition specific, like a // database connection. return pipeline.ConnectWith(beat.ClientConfig{ EventListener: acker.LastEventPrivateReporter(func(acked int, data any) { // Update the checkpoint for the given partition. // // The pipeline calls this function when events have been // successfully published and acknowledged by the output. // // The data parameter contains the private data of the last // acknowledged event. receivedEventData, ok := data.(*azeventhubs.ReceivedEventData) if !ok { log.Errorw( "error updating checkpoint", "partition_id", partitionClient.PartitionID(), "acked", acked, "error", "invalid data type", "type", fmt.Sprintf("%T", data), ) return } err := partitionClient.UpdateCheckpoint(ctx, receivedEventData, nil) if err != nil { log.Errorw( "error updating checkpoint", "error", err, ) } log.Debugw( "checkpoint updated", "partition_id", partitionClient.PartitionID(), "acked", acked, "sequence_number", receivedEventData.SequenceNumber, "offset", receivedEventData.Offset, "enqueued_time", receivedEventData.EnqueuedTime, ) }), Processing: beat.ProcessingConfig{ // This input only produces events with basic types so normalization // is not required. EventNormalization: to.Ptr(false), }, }) } func shutdownPartitionResources(ctx context.Context, partitionClient *azeventhubs.ProcessorPartitionClient, pipelineClient beat.Client) { // Each PartitionClient holds onto an external resource and should be closed if you're // not processing them anymore. defer partitionClient.Close(ctx) // Closing the pipeline since we're done // processing events for this partition. defer pipelineClient.Close() }