x-pack/filebeat/input/azureeventhub/v2_migration.go (217 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. //go:build !aix package azureeventhub import ( "context" "encoding/json" "errors" "fmt" "net/url" "strconv" "strings" "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" "github.com/elastic/elastic-agent-libs/logp" ) type consumerClient interface { GetEventHubProperties(ctx context.Context, options *azeventhubs.GetEventHubPropertiesOptions) (azeventhubs.EventHubProperties, error) } type containerClient interface { NewBlobClient(blobName string) *blob.Client NewListBlobsFlatPager(o *container.ListBlobsFlatOptions) *runtime.Pager[container.ListBlobsFlatResponse] } type checkpointer interface { SetCheckpoint(ctx context.Context, checkpoint azeventhubs.Checkpoint, options *azeventhubs.SetCheckpointOptions) error } // migrationAssistant assists the input in migrating // v1 checkpoint information to v2. type migrationAssistant struct { log *logp.Logger consumerClient consumerClient blobContainerClient containerClient checkpointStore checkpointer } // newMigrationAssistant creates a new migration assistant. func newMigrationAssistant(log *logp.Logger, consumerClient consumerClient, blobContainerClient containerClient, checkpointStore checkpointer) *migrationAssistant { return &migrationAssistant{ log: log, consumerClient: consumerClient, blobContainerClient: blobContainerClient, checkpointStore: checkpointStore, } } // checkAndMigrate checks if the v1 checkpoint information for the partitions // exists and migrates it to v2 if it does. func (m *migrationAssistant) checkAndMigrate(ctx context.Context, eventHubConnectionString, eventHubName, consumerGroup string) error { // Fetching event hub information eventHubProperties, err := m.consumerClient.GetEventHubProperties(ctx, nil) if err != nil { return fmt.Errorf("failed to get event hub properties: %w", err) } m.log.Infow( "event hub information", "name", eventHubProperties.Name, "created_on", eventHubProperties.CreatedOn, "partition_ids", eventHubProperties.PartitionIDs, ) // Parse the connection string to get FQDN. connectionStringInfo, err := parseConnectionString(eventHubConnectionString) if err != nil { return fmt.Errorf("failed to parse connection string: %w", err) } blobs, err := m.listBlobs(ctx) if err != nil { return err } for _, partitionID := range eventHubProperties.PartitionIDs { err = m.checkAndMigratePartition(ctx, blobs, partitionID, connectionStringInfo.FullyQualifiedNamespace, eventHubName, consumerGroup) if err != nil { return fmt.Errorf("failed to check and migrate partition: %w", err) } } return nil } // checkAndMigratePartition checks if the v1 checkpoint information for the `partitionID` exists // `partitionID` partition. func (m *migrationAssistant) checkAndMigratePartition( ctx context.Context, blobs map[string]bool, partitionID, fullyQualifiedNamespace, eventHubName, consumerGroup string) error { // Build the blob path (in the v2 checkpoint format) for the partition `partitionID` // using the fully qualified namespace, event hub name, consumer group, and partition ID. // // The blob path is in the format: // {fullyQualifiedNamespace}/{eventHubName}/{consumerGroup}/checkpoint/{partitionID} // // Here is an example of the blob path: // mbranca-general.servicebus.windows.net/mbrancalogs/$Default/checkpoint/0 // blob := fmt.Sprintf("%s/%s/%s/checkpoint/%s", fullyQualifiedNamespace, eventHubName, consumerGroup, partitionID) // Check if v2 checkpoint information exists if _, ok := blobs[blob]; ok { m.log.Infow( "checkpoint v2 information for partition already exists, no migration needed", "partitionID", partitionID, ) return nil } // Check if v1 checkpoint information exists if _, ok := blobs[partitionID]; !ok { m.log.Infow( "checkpoint v1 information for partition doesn't exist, no migration needed", "partitionID", partitionID, ) return nil } // Try downloading the checkpoint v1 information for the partition cln := m.blobContainerClient.NewBlobClient(partitionID) // 4KB buffer should be enough to read // the checkpoint v1 information. buff := [4096]byte{} size, err := cln.DownloadBuffer(ctx, buff[:], nil) if err != nil { return fmt.Errorf("failed to download checkpoint v1 information for partition %s: %w", partitionID, err) } m.log.Infow( "downloaded checkpoint v1 information for partition", "partitionID", partitionID, "size", size, ) // Unmarshal the checkpoint v1 information var checkpointV1 *LegacyCheckpoint if err := json.Unmarshal(buff[0:size], &checkpointV1); err != nil { return fmt.Errorf("failed to unmarshal checkpoint v1 information for partition %s: %w", partitionID, err) } // migrate the checkpoint v1 information to v2 m.log.Infow("migrating checkpoint v1 information to v2", "partitionID", partitionID) // Common checkpoint information checkpointV2 := azeventhubs.Checkpoint{ ConsumerGroup: consumerGroup, EventHubName: eventHubName, FullyQualifiedNamespace: fullyQualifiedNamespace, PartitionID: partitionID, } offset, err := strconv.ParseInt(checkpointV1.Checkpoint.Offset, 10, 64) if err != nil { return fmt.Errorf("failed to parse offset: %w", err) } checkpointV2.Offset = &offset checkpointV2.SequenceNumber = &checkpointV1.Checkpoint.SequenceNumber // Stores the checkpoint v2 information for the partition if err := m.checkpointStore.SetCheckpoint(ctx, checkpointV2, nil); err != nil { return fmt.Errorf("failed to update checkpoint v2 information for partition %s: %w", partitionID, err) } m.log.Infow("migrated checkpoint v1 information to v2", "partitionID", partitionID) return nil } // listBlobs lists all the blobs in the container. func (m *migrationAssistant) listBlobs(ctx context.Context) (map[string]bool, error) { blobs := map[string]bool{} c := m.blobContainerClient.NewListBlobsFlatPager(nil) for c.More() { page, err := c.NextPage(ctx) if err != nil { return map[string]bool{}, fmt.Errorf("failed to list blobs: %w", err) } for _, blob := range page.Segment.BlobItems { blobs[*blob.Name] = true } } return blobs, nil } type LegacyCheckpoint struct { PartitionID string `json:"partitionID"` Epoch int `json:"epoch"` Owner string `json:"owner"` Checkpoint struct { Offset string `json:"offset"` SequenceNumber int64 `json:"sequenceNumber"` EnqueueTime string `json:"enqueueTime"` // ": "0001-01-01T00:00:00Z" } `json:"checkpoint"` } // ConnectionStringProperties are the properties of a connection string // as returned by [ParseConnectionString]. type ConnectionStringProperties struct { // Endpoint is the Endpoint value in the connection string. // Ex: sb://example.servicebus.windows.net Endpoint string // EntityPath is EntityPath value in the connection string. EntityPath *string // FullyQualifiedNamespace is the Endpoint value without the protocol scheme. // Ex: example.servicebus.windows.net FullyQualifiedNamespace string // SharedAccessKey is the SharedAccessKey value in the connection string. SharedAccessKey *string // SharedAccessKeyName is the SharedAccessKeyName value in the connection string. SharedAccessKeyName *string // SharedAccessSignature is the SharedAccessSignature value in the connection string. SharedAccessSignature *string // Emulator indicates that the connection string is for an emulator: // ex: Endpoint=localhost:6765;SharedAccessKeyName=<< REDACTED >>;SharedAccessKey=<< REDACTED >>;UseDevelopmentEmulator=true Emulator bool } // ParseConnectionString takes a connection string from the Azure portal and returns the // parsed representation. // // There are two supported formats: // // 1. Connection strings generated from the portal (or elsewhere) that contain an embedded key and keyname. // // 2. A connection string with an embedded SharedAccessSignature: // Endpoint=sb://<sb>.servicebus.windows.net;SharedAccessSignature=SharedAccessSignature sr=<sb>.servicebus.windows.net&sig=<base64-sig>&se=<expiry>&skn=<keyname>" func parseConnectionString(connStr string) (ConnectionStringProperties, error) { const ( endpointKey = "Endpoint" sharedAccessKeyNameKey = "SharedAccessKeyName" sharedAccessKeyKey = "SharedAccessKey" entityPathKey = "EntityPath" sharedAccessSignatureKey = "SharedAccessSignature" useEmulator = "UseDevelopmentEmulator" ) csp := ConnectionStringProperties{} splits := strings.Split(connStr, ";") for _, split := range splits { if split == "" { continue } keyAndValue := strings.SplitN(split, "=", 2) if len(keyAndValue) < 2 { return ConnectionStringProperties{}, errors.New("failed parsing connection string due to unmatched key value separated by '='") } // if a key value pair has `=` in the value, recombine them key := keyAndValue[0] value := strings.Join(keyAndValue[1:], "=") switch { case strings.EqualFold(endpointKey, key): u, err := url.Parse(value) if err != nil { return ConnectionStringProperties{}, errors.New("failed parsing connection string due to an incorrectly formatted Endpoint value") } csp.Endpoint = value csp.FullyQualifiedNamespace = u.Host case strings.EqualFold(sharedAccessKeyNameKey, key): csp.SharedAccessKeyName = &value case strings.EqualFold(sharedAccessKeyKey, key): csp.SharedAccessKey = &value case strings.EqualFold(entityPathKey, key): csp.EntityPath = &value case strings.EqualFold(sharedAccessSignatureKey, key): csp.SharedAccessSignature = &value case strings.EqualFold(useEmulator, key): v, err := strconv.ParseBool(value) if err != nil { return ConnectionStringProperties{}, err } csp.Emulator = v } } if csp.Emulator { // check that they're only connecting to localhost endpointParts := strings.SplitN(csp.Endpoint, ":", 3) // allow for a port, if it exists. if len(endpointParts) < 2 || endpointParts[0] != "sb" || endpointParts[1] != "//localhost" { // there should always be at least two parts "sb:" and "//localhost" // with an optional 3rd piece that's the port "1111". // (we don't need to validate it's a valid host since it's been through url.Parse() above) return ConnectionStringProperties{}, fmt.Errorf("UseDevelopmentEmulator=true can only be used with sb://localhost or sb://localhost:<port number>, not %s", csp.Endpoint) } } if csp.FullyQualifiedNamespace == "" { return ConnectionStringProperties{}, fmt.Errorf("key %q must not be empty", endpointKey) } if csp.SharedAccessSignature == nil && csp.SharedAccessKeyName == nil { return ConnectionStringProperties{}, fmt.Errorf("key %q must not be empty", sharedAccessKeyNameKey) } if csp.SharedAccessKey == nil && csp.SharedAccessSignature == nil { return ConnectionStringProperties{}, fmt.Errorf("key %q or %q cannot both be empty", sharedAccessKeyKey, sharedAccessSignatureKey) } return csp, nil }