packages/packages.go (437 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package packages import ( "archive/zip" "context" "fmt" "os" "path/filepath" "slices" "strings" "time" "github.com/Masterminds/semver/v3" "github.com/prometheus/client_golang/prometheus" "go.elastic.co/apm/v2" "go.uber.org/zap" "github.com/elastic/package-registry/metrics" ) // ValidationDisabled is a flag which can disable package content validation (package, data streams, assets, etc.). var ValidationDisabled bool // Packages is a list of packages. type Packages []*Package func (p Packages) Len() int { return len(p) } func (p Packages) Swap(i, j int) { p[i], p[j] = p[j], p[i] } func (p Packages) Less(i, j int) bool { if p[i].Title != nil && p[j].Title != nil && *p[i].Title != *p[j].Title { return *p[i].Title < *p[j].Title } return p[i].Version < p[j].Version } // Join returns a set of packages that combines both sets. If there is already // a package in `p1` with the same name and version that a package in `p2`, the // latter is not added. func (p1 Packages) Join(p2 Packages) Packages { for _, p := range p2 { if p1.contains(p) { continue } p1 = append(p1, p) } return p1 } // contains returns true if `ps` contains a package with the same name and version as `p`. func (ps Packages) contains(p *Package) bool { return ps.index(p) >= 0 } // index finds if `ps` contains a package with the same name and version as `p` and // returns its index. If it is not found, it returns -1. func (ps Packages) index(p *Package) int { for i, candidate := range ps { if candidate.Name != p.Name { continue } if cv, pv := candidate.versionSemVer, p.versionSemVer; cv != nil && pv != nil { if !cv.Equal(pv) { continue } } if candidate.Version != p.Version { continue } return i } return -1 } // GetOptions can be used to pass options to Get. type GetOptions struct { // Filter to apply when querying for packages. If the filter is nil, // all packages are returned. This is different to a zero-object filter, // where experimental packages are filtered by default. Filter *Filter } // FileSystemIndexer indexes packages from the filesystem. type FileSystemIndexer struct { paths []string packageList Packages // Label used for APM instrumentation. label string // Walker function used to look for files, it returns true for paths that should be indexed. walkerFn func(basePath, path string, info os.DirEntry) (shouldIndex bool, err error) // Builder to access the files of a package in this indexer. fsBuilder FileSystemBuilder logger *zap.Logger } // NewFileSystemIndexer creates a new FileSystemIndexer for the given paths. func NewFileSystemIndexer(logger *zap.Logger, paths ...string) *FileSystemIndexer { walkerFn := func(basePath, path string, info os.DirEntry) (bool, error) { relativePath, err := filepath.Rel(basePath, path) if err != nil { return false, err } dirs := strings.Split(relativePath, string(filepath.Separator)) if len(dirs) < 2 { return false, nil // need to go to the package version level } if info.IsDir() { versionDir := dirs[1] _, err := semver.StrictNewVersion(versionDir) if err != nil { logger.Warn("ignoring unexpected directory", zap.String("file.path", path)) return false, filepath.SkipDir } return true, nil } // Unexpected file, return nil in order to continue processing sibling directories // Fixes an annoying problem when the .DS_Store file is left behind and the package // is not loading without any error information logger.Warn("ignoring unexpected file", zap.String("file.path", path)) return false, nil } return &FileSystemIndexer{ paths: paths, label: "FileSystemIndexer", walkerFn: walkerFn, fsBuilder: ExtractedFileSystemBuilder, logger: logger, } } var ExtractedFileSystemBuilder = func(p *Package) (PackageFileSystem, error) { return NewExtractedPackageFileSystem(p) } // NewZipFileSystemIndexer creates a new ZipFileSystemIndexer for the given paths. func NewZipFileSystemIndexer(logger *zap.Logger, paths ...string) *FileSystemIndexer { walkerFn := func(basePath, path string, info os.DirEntry) (bool, error) { if info.IsDir() { return false, nil } if !strings.HasSuffix(path, ".zip") { return false, nil } // Check if the file is actually a zip file. r, err := zip.OpenReader(path) if err != nil { logger.Warn("ignoring invalid zip file", zap.String("file.path", path), zap.Error(err)) return false, nil } defer r.Close() return true, nil } return &FileSystemIndexer{ paths: paths, label: "ZipFileSystemIndexer", walkerFn: walkerFn, fsBuilder: ZipFileSystemBuilder, logger: logger, } } var ZipFileSystemBuilder = func(p *Package) (PackageFileSystem, error) { return NewZipPackageFileSystem(p) } // Init initializes the indexer. func (i *FileSystemIndexer) Init(ctx context.Context) (err error) { i.packageList, err = i.getPackagesFromFileSystem(ctx) if err != nil { return fmt.Errorf("reading packages from filesystem failed: %w", err) } return nil } // Get returns a slice with packages. // Options can be used to filter the returned list of packages. When no options are passed // or they don't contain any filter, no filtering is done. // The list is stored in memory and on the second request directly served from memory. // This assumes changes to packages only happen on restart (unless development mode is enabled). // Caching the packages request many file reads every time this method is called. func (i *FileSystemIndexer) Get(ctx context.Context, opts *GetOptions) (Packages, error) { start := time.Now() defer func() { metrics.IndexerGetDurationSeconds.With(prometheus.Labels{"indexer": i.label}).Observe(time.Since(start).Seconds()) }() if opts == nil { return i.packageList, nil } if opts.Filter != nil { return opts.Filter.Apply(ctx, i.packageList) } return i.packageList, nil } func (i *FileSystemIndexer) getPackagesFromFileSystem(ctx context.Context) (Packages, error) { span, _ := apm.StartSpan(ctx, "GetFromFileSystem", "app") span.Context.SetLabel("indexer", i.label) defer span.End() type packageKey struct { name string version string } packagesFound := make(map[packageKey]struct{}) var pList Packages for _, basePath := range i.paths { packagePaths, err := i.getPackagePaths(basePath) if err != nil { return nil, err } i.logger.Info("Searching packages in " + basePath) for _, path := range packagePaths { p, err := NewPackage(i.logger, path, i.fsBuilder) if err != nil { return nil, fmt.Errorf("loading package failed (path: %s): %w", path, err) } key := packageKey{name: p.Name, version: p.Version} if _, found := packagesFound[key]; found { i.logger.Debug("duplicated package", zap.String("package.name", p.Name), zap.String("package.version", p.Version), zap.String("package.path", p.BasePath)) continue } packagesFound[key] = struct{}{} pList = append(pList, p) i.logger.Debug("found package", zap.String("package.name", p.Name), zap.String("package.version", p.Version), zap.String("package.path", p.BasePath)) } } return pList, nil } // getPackagePaths returns list of available packages, one for each version. func (i *FileSystemIndexer) getPackagePaths(packagesPath string) ([]string, error) { var foundPaths []string err := filepath.WalkDir(packagesPath, func(path string, info os.DirEntry, err error) error { if os.IsNotExist(err) { return filepath.SkipDir } if err != nil { return err } shouldIndex, err := i.walkerFn(packagesPath, path, info) if err != nil { return err } if !shouldIndex { return nil } foundPaths = append(foundPaths, path) if info.IsDir() { // If a directory is being added, consider all its contents part of // the package and continue. return filepath.SkipDir } return nil }) if err != nil { return nil, fmt.Errorf("listing packages failed (path: %s): %w", packagesPath, err) } return foundPaths, nil } // Filter can be used to filter a list of packages. type Filter struct { AllVersions bool Category string Prerelease bool KibanaVersion *semver.Version PackageName string PackageVersion string PackageType string Capabilities []string SpecMin *semver.Version SpecMax *semver.Version Discovery *discoveryFilter // Deprecated, release tags to be removed. Experimental bool } type discoveryFilter struct { Fields discoveryFilterFields } func NewDiscoveryFilter(filter string) (*discoveryFilter, error) { filterType, args, found := strings.Cut(filter, ":") if !found { return nil, fmt.Errorf("could not parse filter %q", filter) } var result discoveryFilter switch filterType { case "fields": for _, name := range strings.Split(args, ",") { result.Fields = append(result.Fields, DiscoveryField{ Name: name, }) } default: return nil, fmt.Errorf("unknown discovery filter %q", filterType) } return &result, nil } func (f *discoveryFilter) Matches(p *Package) bool { if f == nil { return true } return f.Fields.Matches(p) } type discoveryFilterFields []DiscoveryField // Matches implements matching for a collection of fields used as discovery filter. // It matches if all fields in the package are included in the list of fields in the query. func (fields discoveryFilterFields) Matches(p *Package) bool { // If the package doesn't define this filter, it doesn't match. if p.Discovery == nil || len(p.Discovery.Fields) == 0 { return false } for _, packageField := range p.Discovery.Fields { if !slices.Contains([]DiscoveryField(fields), packageField) { return false } } return true } // Apply applies the filter to the list of packages, if the filter is nil, no filtering is done. func (f *Filter) Apply(ctx context.Context, packages Packages) (Packages, error) { if f == nil { return packages, nil } span, ctx := apm.StartSpan(ctx, "FilterPackages", "app") defer span.End() if f.Experimental { return f.legacyApply(ctx, packages), nil } // Checks that only the most recent version of an integration is added to the list var packagesList Packages for _, p := range packages { // Skip experimental packages if flag is not specified. if p.Release == ReleaseExperimental && !f.Prerelease { continue } // Skip prerelease packages by default. if p.IsPrerelease() && !f.Prerelease { continue } if f.KibanaVersion != nil { if valid := p.HasKibanaVersion(f.KibanaVersion); !valid { continue } } if f.PackageName != "" && f.PackageName != p.Name { continue } if f.PackageVersion != "" && f.PackageVersion != p.Version { continue } if f.PackageType != "" && f.PackageType != p.Type { continue } if f.Capabilities != nil { if valid := p.WorksWithCapabilities(f.Capabilities); !valid { continue } } if f.Discovery != nil && !f.Discovery.Matches(p) { continue } if f.SpecMin != nil || f.SpecMax != nil { valid, err := p.HasCompatibleSpec(f.SpecMin, f.SpecMax, f.KibanaVersion) if err != nil { return nil, fmt.Errorf("can't compare spec version for %s (%s-%s): %w", p.Name, f.SpecMin, f.SpecMax, err) } if !valid { continue } } addPackage := true if !f.AllVersions { // Check if the version exists and if it should be added or not. for i, current := range packagesList { if current.Name != p.Name { continue } addPackage = false // If the package in the list is newer or equal, do nothing. if current.IsNewerOrEqual(p) { continue } // Otherwise replace it. packagesList[i] = p } } if addPackage { packagesList = append(packagesList, p) } } // Filter by category after selecting the newer packages. packagesList = filterCategories(packagesList, f.Category) return packagesList, nil } // legacyApply applies the filter to the list of packages for legacy clients using `experimental=true`. func (f *Filter) legacyApply(ctx context.Context, packages Packages) Packages { if f == nil { return packages } // Checks that only the most recent version of an integration is added to the list var packagesList Packages for _, p := range packages { // Skip experimental packages if flag is not specified. if p.Release == ReleaseExperimental && !f.Experimental { continue } if f.KibanaVersion != nil { if valid := p.HasKibanaVersion(f.KibanaVersion); !valid { continue } } if f.PackageName != "" && f.PackageName != p.Name { continue } if f.PackageVersion != "" && f.PackageVersion != p.Version { continue } if f.PackageType != "" && f.PackageType != p.Type { continue } addPackage := true if !f.AllVersions { // Check if the version exists and if it should be added or not. for i, current := range packagesList { if current.Name != p.Name { continue } addPackage = false // If the package in the list is newer or equal, do nothing, unless it is a prerelease. if current.IsPrerelease() == p.IsPrerelease() && current.IsNewerOrEqual(p) { continue } // If the package in the list is not a prerelease, and current is, do nothing. if !current.IsPrerelease() && p.IsPrerelease() { continue } // Otherwise replace it. packagesList[i] = p } } if addPackage { packagesList = append(packagesList, p) } } if f.AllVersions { packageHasNonPrerelease := make(map[string]bool) for _, p := range packagesList { if !p.IsPrerelease() { packageHasNonPrerelease[p.Name] = true } } i := 0 for _, p := range packagesList { if packageHasNonPrerelease[p.Name] && p.IsPrerelease() { continue } packagesList[i] = p i++ } packagesList = packagesList[:i] } // Filter by category after selecting the newer packages. packagesList = filterCategories(packagesList, f.Category) return packagesList } func filterCategories(packages Packages, category string) Packages { if category == "" { return packages } var result Packages for _, p := range packages { if !p.HasCategory(category) && !p.HasPolicyTemplateWithCategory(category) { continue } if !p.HasCategory(category) { p = filterPolicyTemplates(*p, category) } result = append(result, p) } return result } func filterPolicyTemplates(p Package, category string) *Package { var updatedPolicyTemplates []PolicyTemplate var updatedBasePolicyTemplates []BasePolicyTemplate for i, pt := range p.PolicyTemplates { if slices.Contains(pt.Categories, category) { updatedPolicyTemplates = append(updatedPolicyTemplates, pt) updatedBasePolicyTemplates = append(updatedBasePolicyTemplates, p.BasePackage.BasePolicyTemplates[i]) } } p.PolicyTemplates = updatedPolicyTemplates p.BasePackage.BasePolicyTemplates = updatedBasePolicyTemplates return &p } // NameVersionFilter is a helper to initialize a Filter with the usual // options to look per name and version along all packages indexed. func NameVersionFilter(name, version string) GetOptions { return GetOptions{ Filter: &Filter{ Experimental: true, Prerelease: true, PackageName: name, PackageVersion: version, }, } }