storage/indexer.go (181 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package storage import ( "context" "errors" "fmt" "net" "net/http" "net/url" "strings" "sync" "time" "cloud.google.com/go/storage" "github.com/prometheus/client_golang/prometheus" "go.elastic.co/apm/v2" "go.uber.org/zap" "github.com/elastic/package-registry/metrics" "github.com/elastic/package-registry/packages" ) const indexerGetDurationPrometheusLabel = "StorageIndexer" type Indexer struct { options IndexerOptions storageClient *storage.Client cursor string packageList packages.Packages m sync.RWMutex resolver packages.RemoteResolver logger *zap.Logger } type IndexerOptions struct { APMTracer *apm.Tracer PackageStorageBucketInternal string PackageStorageEndpoint string WatchInterval time.Duration } func NewIndexer(logger *zap.Logger, storageClient *storage.Client, options IndexerOptions) *Indexer { if options.APMTracer == nil { options.APMTracer = apm.DefaultTracer() } return &Indexer{ storageClient: storageClient, options: options, logger: logger, } } func (i *Indexer) Init(ctx context.Context) error { i.logger.Debug("Initialize storage indexer") err := validateIndexerOptions(i.options) if err != nil { return fmt.Errorf("validation failed: %w", err) } err = i.setupResolver() if err != nil { return fmt.Errorf("can't setup remote resolver: %w", err) } // Populate index file for the first time. err = i.updateIndex(ctx) if err != nil { return fmt.Errorf("can't update index file: %w", err) } go i.watchIndices(apm.ContextWithTransaction(ctx, nil)) return nil } func validateIndexerOptions(options IndexerOptions) error { if !strings.HasPrefix(options.PackageStorageBucketInternal, "gs://") { return errors.New("missing or invalid options.PackageStorageBucketInternal") } _, err := url.Parse(options.PackageStorageEndpoint) if err != nil { return fmt.Errorf("invalid options.PackageStorageEndpoint, URL expected: %w", err) } if options.WatchInterval < 0 { return errors.New("options.WatchInterval must be greater than or equal to 0") } return nil } func (i *Indexer) setupResolver() error { baseURL, err := url.Parse(i.options.PackageStorageEndpoint) if err != nil { return err } httpClient := http.Client{ Transport: &http.Transport{ DialContext: (&net.Dialer{ // Connect timeout. Timeout: 20 * time.Second, }).DialContext, }, } i.resolver = storageResolver{ client: &httpClient, artifactsPackagesURL: *baseURL.ResolveReference(&url.URL{Path: artifactsPackagesStoragePath + "/"}), artifactsStaticURL: *baseURL.ResolveReference(&url.URL{Path: artifactsStaticStoragePath + "/"}), } return nil } func (i *Indexer) watchIndices(ctx context.Context) { i.logger.Debug("Watch indices for changes") if i.options.WatchInterval == 0 { i.logger.Debug("No watcher configured, indices will not be updated (use only for testing purposes)") return } var err error t := time.NewTicker(i.options.WatchInterval) defer t.Stop() for { i.logger.Debug("watchIndices: start") func() { tx := i.options.APMTracer.StartTransaction("updateIndex", "backend.watcher") defer tx.End() err = i.updateIndex(apm.ContextWithTransaction(ctx, tx)) if err != nil { i.logger.Error("can't update index file", zap.Error(err)) } }() i.logger.Debug("watchIndices: finished") select { case <-ctx.Done(): i.logger.Debug("watchIndices: quit") return case <-t.C: } } } func (i *Indexer) updateIndex(ctx context.Context) error { span, ctx := apm.StartSpan(ctx, "UpdateIndex", "app") defer span.End() i.logger.Debug("Update indices") start := time.Now() defer func() { metrics.StorageIndexerUpdateIndexDurationSeconds.Observe(time.Since(start).Seconds()) }() bucketName, rootStoragePath, err := extractBucketNameFromURL(i.options.PackageStorageBucketInternal) if err != nil { metrics.StorageIndexerUpdateIndexErrorsTotal.Inc() return fmt.Errorf("can't extract bucket name from URL (url: %s): %w", i.options.PackageStorageBucketInternal, err) } storageCursor, err := loadCursor(ctx, i.logger, i.storageClient, bucketName, rootStoragePath) if err != nil { metrics.StorageIndexerUpdateIndexErrorsTotal.Inc() return fmt.Errorf("can't load latest cursor: %w", err) } if storageCursor.Current == i.cursor { i.logger.Info("cursor is up-to-date", zap.String("cursor.current", i.cursor)) return nil } i.logger.Info("cursor will be updated", zap.String("cursor.current", i.cursor), zap.String("cursor.next", storageCursor.Current)) anIndex, err := loadSearchIndexAll(ctx, i.logger, i.storageClient, bucketName, rootStoragePath, *storageCursor) if err != nil { metrics.StorageIndexerUpdateIndexErrorsTotal.Inc() return fmt.Errorf("can't load the search-index-all index content: %w", err) } if anIndex == nil { i.logger.Info("Downloaded new search-index-all index. No packages found.") return nil } i.logger.Info("Downloaded new search-index-all index", zap.String("index.packages.size", fmt.Sprintf("%d", len(*anIndex)))) i.transformSearchIndexAllToPackages(anIndex) i.m.Lock() defer i.m.Unlock() i.cursor = storageCursor.Current i.packageList = *anIndex metrics.StorageIndexerUpdateIndexSuccessTotal.Inc() metrics.NumberIndexedPackages.Set(float64(len(i.packageList))) return nil } func (i *Indexer) Get(ctx context.Context, opts *packages.GetOptions) (packages.Packages, error) { start := time.Now() defer func() { metrics.IndexerGetDurationSeconds.With(prometheus.Labels{"indexer": indexerGetDurationPrometheusLabel}).Observe(time.Since(start).Seconds()) }() i.m.RLock() defer i.m.RUnlock() if opts != nil && opts.Filter != nil { return opts.Filter.Apply(ctx, i.packageList) } return i.packageList, nil } func (i *Indexer) transformSearchIndexAllToPackages(packages *packages.Packages) { for _, m := range *packages { m.BasePath = fmt.Sprintf("%s-%s.zip", m.Name, m.Version) m.SetRemoteResolver(i.resolver) } }