x-pack/filebeat/input/azureeventhub/v1_input.go (201 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. //go:build !aix package azureeventhub import ( "context" "fmt" "strings" "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/go-autorest/autorest/azure" eventhub "github.com/Azure/azure-event-hubs-go/v3" "github.com/Azure/azure-event-hubs-go/v3/eph" "github.com/Azure/azure-event-hubs-go/v3/storage" "github.com/Azure/azure-storage-blob-go/azblob" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/acker" "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" ) // eventHubInputV1 is the Azure Event Hub input V1. // // This input uses the Azure Event Hub SDK v3 (legacy). type eventHubInputV1 struct { config azureInputConfig log *logp.Logger metrics *inputMetrics processor *eph.EventProcessorHost pipelineClient beat.Client messageDecoder messageDecoder } // newEventHubInputV1 creates a new instance of the Azure Event Hub input V1. // This input uses the Azure Event Hub SDK v3 (legacy). func newEventHubInputV1(config azureInputConfig, logger *logp.Logger) (v2.Input, error) { log := logger. Named(inputName). With( "connection string", stripConnectionString(config.ConnectionString), ) return &eventHubInputV1{ config: config, log: log, }, nil } func (in *eventHubInputV1) Name() string { return inputName } func (in *eventHubInputV1) Test(v2.TestContext) error { return nil } func (in *eventHubInputV1) Run( inputContext v2.Context, pipeline beat.Pipeline, ) error { var err error // Update the status to starting inputContext.UpdateStatus(status.Starting, "") // Create pipelineClient for publishing events. in.pipelineClient, err = createPipelineClient(pipeline) if err != nil { inputContext.UpdateStatus(status.Failed, err.Error()) return fmt.Errorf("failed to create pipeline pipelineClient: %w", err) } defer in.pipelineClient.Close() // Setup input metrics in.metrics = newInputMetrics(inputContext.ID, nil) defer in.metrics.Close() // Set up new and legacy sanitizers, if any. sanitizers, err := newSanitizers(in.config.Sanitizers, in.config.LegacySanitizeOptions) if err != nil { inputContext.UpdateStatus(status.Failed, err.Error()) return fmt.Errorf("failed to create sanitizers: %w", err) } in.messageDecoder = messageDecoder{ config: in.config, log: in.log, metrics: in.metrics, sanitizers: sanitizers, } ctx := v2.GoContextFromCanceler(inputContext.Cancelation) // Initialize the input components // in preparation for the main run loop. err = in.setup(ctx) if err != nil { in.log.Errorw("error setting up input", "error", err) inputContext.UpdateStatus(status.Failed, err.Error()) return err } // Start the main run loop err = in.run(ctx) if err != nil { in.log.Errorw("error running input", "error", err) inputContext.UpdateStatus(status.Failed, err.Error()) return err } inputContext.UpdateStatus(status.Stopping, "") return nil } // setup initializes the input components. // // The main components are: // 1. Azure Storage Leaser / Checkpointer // 2. Event Processor Host // 3. Message handler func (in *eventHubInputV1) setup(ctx context.Context) error { // ---------------------------------------------------- // 1 — Create a new Azure Storage Leaser / Checkpointer // ---------------------------------------------------- cred, err := azblob.NewSharedKeyCredential(in.config.SAName, in.config.SAKey) if err != nil { return err } env, err := getAzureEnvironment(in.config.OverrideEnvironment) if err != nil { return err } leaserCheckpointer, err := storage.NewStorageLeaserCheckpointer(cred, in.config.SAName, in.config.SAContainer, env) if err != nil { in.log.Errorw("error creating storage leaser checkpointer", "error", err) return err } in.log.Infof("storage leaser checkpointer created for container %q", in.config.SAContainer) // ------------------------------------------------ // 2 — Create a new event processor host // ------------------------------------------------ // adding a nil EventProcessorHostOption will break the code, // this is why a condition is added and a.processor is assigned. if in.config.ConsumerGroup != "" { in.processor, err = eph.NewFromConnectionString( ctx, fmt.Sprintf("%s%s%s", in.config.ConnectionString, eventHubConnector, in.config.EventHubName), leaserCheckpointer, leaserCheckpointer, eph.WithConsumerGroup(in.config.ConsumerGroup), eph.WithNoBanner()) } else { in.processor, err = eph.NewFromConnectionString( ctx, fmt.Sprintf("%s%s%s", in.config.ConnectionString, eventHubConnector, in.config.EventHubName), leaserCheckpointer, leaserCheckpointer, eph.WithNoBanner()) } if err != nil { in.log.Errorw("error creating processor", "error", err) return err } in.log.Infof("event processor host created for event hub %q", in.config.EventHubName) // ------------------------------------------------ // 3 — Register a message handler // ------------------------------------------------ // register a message handler -- many can be registered handlerID, err := in.processor.RegisterHandler(ctx, func(c context.Context, e *eventhub.Event) error { // Take the event message from the event hub, // creates and publishes one (or more) events // to the beats pipeline. in.processEvents(e) // Why is this function always returning no error? // // The legacy SDK does not offer hooks to control // checkpointing (it internally updates the checkpoint // info after a successful handler execution). // // So we are keeping the existing behaviour (do not // handle publish acks). // // On shutdown, Filebeat stops the input, waits for // the output to process all the events in the queue. return nil }) if err != nil { in.log.Errorw("error registering handler", "error", err) return err } in.log.Infof("handler id: %q is registered\n", handlerID) return nil } func (in *eventHubInputV1) run(ctx context.Context) error { // Start handling messages from all the partitions balancing across // multiple consumers. // The processor can be stopped by calling `Close()` on the processor. // The `Start()` function is not an option because // it waits for an `os.Interrupt` signal to stop // the processor. err := in.processor.StartNonBlocking(ctx) if err != nil { in.log.Errorw("error starting the processor", "error", err) return err } defer func() { in.log.Infof("%s input worker is stopping.", inputName) err := in.processor.Close(context.Background()) if err != nil { in.log.Errorw("error while closing eventhostprocessor", "error", err) } in.log.Infof("%s input worker has stopped.", inputName) }() in.log.Infof("%s input worker has started.", inputName) // wait for the context to be done <-ctx.Done() return ctx.Err() } func (in *eventHubInputV1) processEvents(event *eventhub.Event) { processingStartTime := time.Now() eventHubMetadata := mapstr.M{ // The `partition_id` is not available in the // legacy version of the SDK. "eventhub": in.config.EventHubName, "consumer_group": in.config.ConsumerGroup, } // update the input metrics in.metrics.receivedMessages.Inc() in.metrics.receivedBytes.Add(uint64(len(event.Data))) records := in.messageDecoder.Decode(event.Data) for _, record := range records { _, _ = eventHubMetadata.Put("offset", event.SystemProperties.Offset) _, _ = eventHubMetadata.Put("sequence_number", event.SystemProperties.SequenceNumber) _, _ = eventHubMetadata.Put("enqueued_time", event.SystemProperties.EnqueuedTime) event := beat.Event{ // We set the timestamp to the processing // start time as default value. // // Usually, the ingest pipeline replaces it // with a value in the payload. Timestamp: processingStartTime, Fields: mapstr.M{ "message": record, "azure": eventHubMetadata, }, Private: event.Data, } in.pipelineClient.Publish(event) in.metrics.sentEvents.Inc() } in.metrics.processedMessages.Inc() in.metrics.processingTime.Update(time.Since(processingStartTime).Nanoseconds()) } func createPipelineClient(pipeline beat.Pipeline) (beat.Client, error) { return pipeline.ConnectWith(beat.ClientConfig{ EventListener: acker.LastEventPrivateReporter(func(acked int, data interface{}) { // fmt.Println(acked, data) }), Processing: beat.ProcessingConfig{ // This input only produces events with basic types so normalization // is not required. EventNormalization: to.Ptr(false), }, }) } // Strip connection string to remove sensitive information // A connection string should look like this: // Endpoint=sb://dummynamespace.servicebus.windows.net/;SharedAccessKeyName=DummyAccessKeyName;SharedAccessKey=5dOntTRytoC24opYThisAsit3is2B+OGY1US/fuL3ly= // This code will remove everything after ';' so key information is stripped func stripConnectionString(c string) string { if parts := strings.SplitN(c, ";", 2); len(parts) == 2 { return parts[0] } // We actually expect the string to have the documented format // if we reach here something is wrong, so let's stay on the safe side return "(redacted)" } func getAzureEnvironment(overrideResManager string) (azure.Environment, error) { // if no override is set then the azure public cloud is used if overrideResManager == "" || overrideResManager == "<no value>" { return azure.PublicCloud, nil } if env, ok := environments[overrideResManager]; ok { return env, nil } // can retrieve hybrid env from the resource manager endpoint return azure.EnvironmentFromURL(overrideResManager) }