func()

in azkustoingest/internal/resources/resources.go [291:338]


func (m *Manager) fetch(ctx context.Context) error {
	m.fetchLock.Lock()
	defer m.fetchLock.Unlock()

	var dataset v1.Dataset
	retryCtx := backoff.WithContext(initBackoff(), ctx)
	err := backoff.Retry(func() error {
		var err error
		dataset, err = m.client.Mgmt(ctx, "NetDefaultDB", kql.New(".get ingestion resources"))
		if err == nil {
			return nil
		}
		if httpErr, ok := err.(*kustoErrors.HttpError); ok {
			// only retry in case of throttling
			if httpErr.IsThrottled() {
				return err
			}
		}
		return backoff.Permanent(err)
	}, retryCtx)

	if err != nil {
		return fmt.Errorf("problem getting ingestion resources from Kusto: %s", err)
	}

	ingest := Ingestion{}

	resc, err := query.ToStructs[ingestResc](dataset)
	if err != nil {
		return err
	}

	for _, rec := range resc {
		if err := ingest.importRec(rec, m.rankedStorageAccount); err != nil && !errors.Is(err, errDoNotCare) {
			return err
		}
	}

	if err != nil {
		return fmt.Errorf("problem reading ingestion resources from Kusto: %s", err)
	}

	m.resources.Store(ingest)

	m.lastFetchTime.Store(time.Now().UTC())

	return nil
}