azkustoingest/internal/resources/resources.go (316 lines of code) (raw):

// Package resources contains objects that are used to gather information about Kusto resources that are // used during various ingestion methods. package resources import ( "context" "errors" "fmt" "net/url" "strings" "sync" "sync/atomic" "time" "github.com/Azure/azure-kusto-go/azkustodata" "github.com/Azure/azure-kusto-go/azkustodata/query" v1 "github.com/Azure/azure-kusto-go/azkustodata/query/v1" kustoErrors "github.com/Azure/azure-kusto-go/azkustodata/errors" "github.com/Azure/azure-kusto-go/azkustodata/kql" "github.com/cenkalti/backoff/v4" ) const ( defaultInitialInterval = 1 * time.Second defaultMultiplier = 2 retryCount = 4 fetchInterval = 1 * time.Hour ) // mgmter is a private interface that allows us to write hermetic tests against the azkustodata.Client.Mgmt() method. type mgmter interface { Mgmt(ctx context.Context, db string, statement azkustodata.Statement, options ...azkustodata.QueryOption) (v1.Dataset, error) } // URI represents a resource URI for an ingestion command. type URI struct { u *url.URL account, objectName string sas url.Values } // Parse parses a string representing a Kutso resource URI. func Parse(resourceUri string) (*URI, error) { // Example for a valid url: // https://fkjsalfdks.blob.core.windows.com/sdsadsadsa?sas=asdasdasd u, err := url.Parse(resourceUri) if err != nil { return nil, err } if u.Scheme != "https" { return nil, fmt.Errorf("URI scheme must be 'https', was '%s'", u.Scheme) } v := &URI{ u: u, account: u.Hostname(), objectName: strings.TrimLeft(u.EscapedPath(), "/"), sas: u.Query(), } if err := v.validate(); err != nil { return nil, err } return v, nil } func (u *URI) validate() error { if u.objectName == "" { return fmt.Errorf("object name was not provided") } return nil } // Account is the Azure storage account that will be used. func (u *URI) Account() string { return u.account } // ObjectName returns the object name of the resource, i.e container name. func (u *URI) ObjectName() string { return u.objectName } // SAS is shared access signature used to access Azure storage. // https://docs.microsoft.com/en-us/azure/storage/common/storage-sas-overview func (u *URI) SAS() url.Values { return u.sas } // String implements fmt.Stringer. func (u *URI) String() string { return u.u.String() } // URL returns the internal *url.URL object. func (u *URI) URL() *url.URL { return u.u } // token represents a Kusto identity token. type token struct { AuthContext string `kusto:"AuthorizationContext"` } // Manager manages Kusto resources. type Manager struct { client mgmter done chan struct{} resources atomic.Value // Stores Ingestion lastFetchTime atomic.Value // Stores time.Time kustoToken token authTokenCacheExpiration time.Time authLock sync.Mutex fetchLock sync.Mutex rankedStorageAccount *RankedStorageAccountSet } // New is the constructor for Manager. func New(client mgmter) (*Manager, error) { m := &Manager{client: client, done: make(chan struct{}), rankedStorageAccount: newDefaultRankedStorageAccountSet()} m.authLock = sync.Mutex{} m.fetchLock = sync.Mutex{} m.authTokenCacheExpiration = time.Now().UTC() go m.renewResources() return m, nil } // Close closes the manager. This stops any token refreshes. func (m *Manager) Close() { for { select { case <-m.done: return default: close(m.done) return } } } func (m *Manager) renewResources() { tickDuration := 30 * time.Second tick := time.NewTicker(tickDuration) count := fetchInterval // Start with a fetch immediately. for { select { case <-tick.C: count += tickDuration if count >= fetchInterval { count = 0 * time.Second m.fetchRetry(context.Background()) } case <-m.done: tick.Stop() return } } } // AuthContext returns a string representing the authorization context. This auth token is a temporary token // that can be used to write a message via ingestion. This is different than the ADAL token. func (m *Manager) AuthContext(ctx context.Context) (string, error) { m.authLock.Lock() defer m.authLock.Unlock() if m.authTokenCacheExpiration.After(time.Now().UTC()) { return m.kustoToken.AuthContext, nil } var dataset v1.Dataset retryCtx := backoff.WithContext(initBackoff(), ctx) err := backoff.Retry(func() error { var err error dataset, err = m.client.Mgmt(ctx, "NetDefaultDB", kql.New(".get kusto identity token")) if err == nil { return nil } if httpErr, ok := err.(*kustoErrors.HttpError); ok { // only retry in case of throttling if httpErr.IsThrottled() { return err } } return backoff.Permanent(err) }, retryCtx) if err != nil { return "", fmt.Errorf("problem getting authorization context from Kusto via Mgmt: %s", err) } tokens, err := query.ToStructs[token](dataset) if err != nil { return "", err } if tokens == nil { return "", fmt.Errorf("call for AuthContext returned no Rows") } if len(tokens) != 1 { return "", fmt.Errorf("call for AuthContext returned more than 1 Row") } m.kustoToken = tokens[0] m.authTokenCacheExpiration = time.Now().UTC().Add(time.Hour) return tokens[0].AuthContext, nil } // ingestResc represents a kusto Mgmt() record about a resource type ingestResc struct { // Type is the type of resource, either "TempStorage" or "SecuredReadyForAggregationQueue". Type string `kusto:"ResourceTypeName"` // Root is the storage root URI, which should conform to the local URI type. Root string `kusto:"StorageRoot"` } // Ingestion holds information about Ingestion resources. type Ingestion struct { // Queues contains URIs for Queue resources. Queues []*URI // Containers has URIs for blob resources. Containers []*URI // Tables contains URIs for table resources. Tables []*URI // } var errDoNotCare = errors.New("don't care about this") func (i *Ingestion) importRec(rec ingestResc, rankedStorageAccounts *RankedStorageAccountSet) error { u, err := Parse(rec.Root) if err != nil { return fmt.Errorf("the StorageRoot URI received(%s) has an error: %s", rec.Root, err) } switch rec.Type { case "TempStorage": i.Containers = append(i.Containers, u) rankedStorageAccounts.registerStorageAccount(u.Account()) case "SecuredReadyForAggregationQueue": i.Queues = append(i.Queues, u) rankedStorageAccounts.registerStorageAccount(u.Account()) case "IngestionsStatusTable": i.Tables = append(i.Tables, u) default: return errDoNotCare } return nil } // Returns a list of ranked storage account resources distributed by round robin. func groupResourcesByStorageAccount(resources []*URI, rankedStorageAccount []RankedStorageAccount) []*URI { // Group the resources by storage account. storageAccounts := make(map[string][]*URI) for _, resource := range resources { storageAccounts[resource.Account()] = append(storageAccounts[resource.Account()], resource) } // Rank the resources by storage account. var rankedResources []*URI for _, account := range rankedStorageAccount { if resources, ok := storageAccounts[account.getAccountName()]; ok { rankedResources = append(rankedResources, resources...) } } //Distribute the resources by round robin. var distributedResources []*URI for i := 0; i < len(rankedResources); i++ { distributedResources = append(distributedResources, rankedResources[i%len(rankedResources)]) } return distributedResources } func (i *Ingestion) getRankedStorageContainers(rankedStorageAccounts []RankedStorageAccount) []*URI { return groupResourcesByStorageAccount(i.Containers, rankedStorageAccounts) } func (i *Ingestion) getRankedStorageQueues(rankedStorageAccounts []RankedStorageAccount) []*URI { return groupResourcesByStorageAccount(i.Queues, rankedStorageAccounts) } // fetch makes a azkustodata.Client.Mgmt() call to retrieve the resources used for Ingestion. func (m *Manager) fetch(ctx context.Context) error { m.fetchLock.Lock() defer m.fetchLock.Unlock() var dataset v1.Dataset retryCtx := backoff.WithContext(initBackoff(), ctx) err := backoff.Retry(func() error { var err error dataset, err = m.client.Mgmt(ctx, "NetDefaultDB", kql.New(".get ingestion resources")) if err == nil { return nil } if httpErr, ok := err.(*kustoErrors.HttpError); ok { // only retry in case of throttling if httpErr.IsThrottled() { return err } } return backoff.Permanent(err) }, retryCtx) if err != nil { return fmt.Errorf("problem getting ingestion resources from Kusto: %s", err) } ingest := Ingestion{} resc, err := query.ToStructs[ingestResc](dataset) if err != nil { return err } for _, rec := range resc { if err := ingest.importRec(rec, m.rankedStorageAccount); err != nil && !errors.Is(err, errDoNotCare) { return err } } if err != nil { return fmt.Errorf("problem reading ingestion resources from Kusto: %s", err) } m.resources.Store(ingest) m.lastFetchTime.Store(time.Now().UTC()) return nil } func (m *Manager) fetchRetry(ctx context.Context) error { attempts := 0 for { select { case <-m.done: return nil default: } ctx, cancel := context.WithTimeout(ctx, 30*time.Second) err := m.fetch(ctx) cancel() if err != nil { attempts++ if attempts > retryCount { return fmt.Errorf("failed to fetch ingestion resources: %w", err) } time.Sleep(10 * time.Second) continue } return nil } } func initBackoff() backoff.BackOff { exp := backoff.NewExponentialBackOff() exp.InitialInterval = defaultInitialInterval exp.Multiplier = defaultMultiplier return backoff.WithMaxRetries(exp, retryCount) } // Resources returns information about the ingestion resources. This will used cached information instead // of fetching from source. func (m *Manager) getResources() (Ingestion, error) { lastFetchTime, ok := m.lastFetchTime.Load().(time.Time) if !ok || lastFetchTime.Add(2*fetchInterval).Before(time.Now().UTC()) { err := m.fetchRetry(context.Background()) if err != nil { return Ingestion{}, err } } i, ok := m.resources.Load().(Ingestion) if !ok { return Ingestion{}, fmt.Errorf("manager has not retrieved an Ingestion object yet") } return i, nil } // Report storage account resource usage results. func (m *Manager) ReportStorageResourceResult(accountName string, success bool) { m.rankedStorageAccount.addAccountResult(accountName, success) } // Get ranked containers func (m *Manager) GetRankedStorageContainers() ([]*URI, error) { ingestionResources, err := m.getResources() if err != nil { return nil, err } return ingestionResources.getRankedStorageContainers(m.rankedStorageAccount.getRankedShuffledAccounts()), nil } // get ranked queues func (m *Manager) GetRankedStorageQueues() ([]*URI, error) { ingestionResources, err := m.getResources() if err != nil { return nil, err } return ingestionResources.getRankedStorageQueues(m.rankedStorageAccount.getRankedShuffledAccounts()), nil } func (m *Manager) GetTables() ([]*URI, error) { ingestionResources, err := m.getResources() if err != nil { return nil, err } return ingestionResources.Tables, nil }