in azkustoingest/internal/resources/resources.go [291:338]
func (m *Manager) fetch(ctx context.Context) error {
m.fetchLock.Lock()
defer m.fetchLock.Unlock()
var dataset v1.Dataset
retryCtx := backoff.WithContext(initBackoff(), ctx)
err := backoff.Retry(func() error {
var err error
dataset, err = m.client.Mgmt(ctx, "NetDefaultDB", kql.New(".get ingestion resources"))
if err == nil {
return nil
}
if httpErr, ok := err.(*kustoErrors.HttpError); ok {
// only retry in case of throttling
if httpErr.IsThrottled() {
return err
}
}
return backoff.Permanent(err)
}, retryCtx)
if err != nil {
return fmt.Errorf("problem getting ingestion resources from Kusto: %s", err)
}
ingest := Ingestion{}
resc, err := query.ToStructs[ingestResc](dataset)
if err != nil {
return err
}
for _, rec := range resc {
if err := ingest.importRec(rec, m.rankedStorageAccount); err != nil && !errors.Is(err, errDoNotCare) {
return err
}
}
if err != nil {
return fmt.Errorf("problem reading ingestion resources from Kusto: %s", err)
}
m.resources.Store(ingest)
m.lastFetchTime.Store(time.Now().UTC())
return nil
}