internal/resources/providers/gcplib/inventory/provider.go (440 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package inventory import ( "context" "fmt" "strings" "sync" asset "cloud.google.com/go/asset/apiv1" "cloud.google.com/go/asset/apiv1/assetpb" "github.com/googleapis/gax-go/v2" "github.com/samber/lo" "google.golang.org/api/cloudresourcemanager/v3" "google.golang.org/api/iterator" "google.golang.org/api/option" "google.golang.org/protobuf/types/known/structpb" "github.com/elastic/cloudbeat/internal/infra/clog" "github.com/elastic/cloudbeat/internal/resources/fetching" "github.com/elastic/cloudbeat/internal/resources/providers/gcplib/auth" ) type Provider struct { log *clog.Logger config auth.GcpFactoryConfig inventory *AssetsInventoryWrapper crm *ResourceManagerWrapper cloudAccountMetadataCache *MapCache[*fetching.CloudAccountMetadata] } type AssetsInventoryWrapper struct { Close func() error ListAssets func(ctx context.Context, req *assetpb.ListAssetsRequest, opts ...gax.CallOption) Iterator } type ResourceManagerWrapper struct { // returns project display name or an empty string getProjectDisplayName func(ctx context.Context, parent string) string // returns org display name or an empty string getOrganizationDisplayName func(ctx context.Context, parent string) string } type MonitoringAsset struct { CloudAccount *fetching.CloudAccountMetadata LogMetrics []*ExtendedGcpAsset `json:"log_metrics,omitempty"` Alerts []*ExtendedGcpAsset `json:"alerts,omitempty"` } type LoggingAsset struct { CloudAccount *fetching.CloudAccountMetadata LogSinks []*ExtendedGcpAsset `json:"log_sinks,omitempty"` } type ProjectPoliciesAsset struct { CloudAccount *fetching.CloudAccountMetadata Policies []*ExtendedGcpAsset `json:"policies,omitempty"` } type ServiceUsageAsset struct { CloudAccount *fetching.CloudAccountMetadata Services []*ExtendedGcpAsset `json:"services,omitempty"` } type ExtendedGcpAsset struct { *assetpb.Asset CloudAccount *fetching.CloudAccountMetadata } type ProviderInitializer struct{} type dnsPolicyFields struct { networks []string enableLogging bool } type TypeGenerator[T any] func(assets []*ExtendedGcpAsset, projectId, projectName, orgId, orgName string) *T type Iterator interface { Next() (*assetpb.Asset, error) } type ServiceAPI interface { // ListAllAssetTypesByName List all content types of the given assets types ListAllAssetTypesByName(ctx context.Context, assets []string) ([]*ExtendedGcpAsset, error) // ListMonitoringAssets List all monitoring assets by project id ListMonitoringAssets(ctx context.Context, monitoringAssetTypes map[string][]string) ([]*MonitoringAsset, error) // ListLoggingAssets returns a list of logging assets grouped by project id, extended with folder and org level log sinks ListLoggingAssets(ctx context.Context) ([]*LoggingAsset, error) // ListServiceUsageAssets returns a list of service usage assets grouped by project id ListServiceUsageAssets(ctx context.Context) ([]*ServiceUsageAsset, error) // returns a project policies for all its ancestors ListProjectsAncestorsPolicies(ctx context.Context) ([]*ProjectPoliciesAsset, error) // Close the GCP asset client Close() error } type ProviderInitializerAPI interface { // Init initializes the GCP asset client Init(ctx context.Context, log *clog.Logger, gcpConfig auth.GcpFactoryConfig) (ServiceAPI, error) } func (p *ProviderInitializer) Init(ctx context.Context, log *clog.Logger, gcpConfig auth.GcpFactoryConfig) (ServiceAPI, error) { limiter := NewAssetsInventoryRateLimiter(log) // initialize GCP assets inventory client client, err := asset.NewClient(ctx, append(gcpConfig.ClientOpts, option.WithGRPCDialOption(limiter.GetInterceptorDialOption()))...) if err != nil { return nil, err } // wrap the assets inventory client for mocking assetsInventoryWrapper := &AssetsInventoryWrapper{ Close: client.Close, ListAssets: func(ctx context.Context, req *assetpb.ListAssetsRequest, opts ...gax.CallOption) Iterator { return client.ListAssets(ctx, req, append(opts, RetryOnResourceExhausted)...) }, } // initialize GCP resource manager client var gcpClientOpt []option.ClientOption gcpClientOpt = append(append(gcpClientOpt, option.WithScopes(cloudresourcemanager.CloudPlatformReadOnlyScope)), gcpConfig.ClientOpts...) crmService, err := cloudresourcemanager.NewService(ctx, gcpClientOpt...) if err != nil { return nil, err } displayNamesCache := NewMapCache[string]() // wrap the resource manager client for mocking crmServiceWrapper := &ResourceManagerWrapper{ getProjectDisplayName: func(ctx context.Context, parent string) string { return displayNamesCache.Get(func() string { prj, err := crmService.Projects.Get(parent).Context(ctx).Do() if err != nil { log.Errorf("error fetching GCP Project: %s, error: %s", parent, err) return "" } return prj.DisplayName }, parent) }, getOrganizationDisplayName: func(ctx context.Context, parent string) string { return displayNamesCache.Get(func() string { org, err := crmService.Organizations.Get(parent).Context(ctx).Do() if err != nil { log.Errorf("error fetching GCP Org: %s, error: %s", parent, err) return "" } return org.DisplayName }, parent) }, } return &Provider{ config: gcpConfig, log: log, inventory: assetsInventoryWrapper, crm: crmServiceWrapper, cloudAccountMetadataCache: NewMapCache[*fetching.CloudAccountMetadata](), }, nil } func (p *Provider) ListAllAssetTypesByName(ctx context.Context, assetTypes []string) ([]*ExtendedGcpAsset, error) { wg := sync.WaitGroup{} var resourceAssets []*assetpb.Asset var policyAssets []*assetpb.Asset wg.Add(1) go func() { resourceAssets = p.getAllAssets(ctx, &assetpb.ListAssetsRequest{ Parent: p.config.Parent, AssetTypes: assetTypes, ContentType: assetpb.ContentType_RESOURCE, }) wg.Done() }() wg.Add(1) go func() { policyAssets = p.getAllAssets(ctx, &assetpb.ListAssetsRequest{ Parent: p.config.Parent, AssetTypes: assetTypes, ContentType: assetpb.ContentType_IAM_POLICY, }) wg.Done() }() wg.Wait() var assets []*assetpb.Asset assets = append(append(assets, resourceAssets...), policyAssets...) mergedAssets := mergeAssetContentType(assets) extendedAssets := p.extendWithCloudMetadata(ctx, mergedAssets) // Enrich network assets with dns policy p.enrichNetworkAssets(ctx, extendedAssets) return extendedAssets, nil } // ListMonitoringAssets returns a list of monitoring assets grouped by project id func (p *Provider) ListMonitoringAssets(ctx context.Context, monitoringAssetTypes map[string][]string) ([]*MonitoringAsset, error) { logMetrics, err := p.ListAllAssetTypesByName(ctx, monitoringAssetTypes["LogMetric"]) if err != nil { return nil, err } alertPolicies, err := p.ListAllAssetTypesByName(ctx, monitoringAssetTypes["AlertPolicy"]) if err != nil { return nil, err } typeGenerator := func(assets []*ExtendedGcpAsset, projectId, projectName, orgId, orgName string) *MonitoringAsset { return &MonitoringAsset{ LogMetrics: getAssetsByType(assets, MonitoringLogMetricAssetType), Alerts: getAssetsByType(assets, MonitoringAlertPolicyAssetType), CloudAccount: &fetching.CloudAccountMetadata{ AccountId: projectId, AccountName: projectName, OrganisationId: orgId, OrganizationName: orgName, }, } } var assets []*ExtendedGcpAsset assets = append(append(assets, logMetrics...), alertPolicies...) monitoringAssets := getAssetsByProject[MonitoringAsset](assets, p.log, typeGenerator) return monitoringAssets, nil } // ListLoggingAssets returns a list of logging assets grouped by project id, extended with folder and org level log sinks func (p *Provider) ListLoggingAssets(ctx context.Context) ([]*LoggingAsset, error) { logSinks, err := p.ListAllAssetTypesByName(ctx, []string{LogSinkAssetType}) if err != nil { return nil, err } typeGenerator := func(assets []*ExtendedGcpAsset, projectId, projectName, orgId, orgName string) *LoggingAsset { return &LoggingAsset{ LogSinks: assets, CloudAccount: &fetching.CloudAccountMetadata{ AccountId: projectId, AccountName: projectName, OrganisationId: orgId, OrganizationName: orgName, }, } } loggingAssets := getAssetsByProject[LoggingAsset](logSinks, p.log, typeGenerator) return loggingAssets, nil } // ListServiceUsageAssets returns a list of service usage assets grouped by project id func (p *Provider) ListServiceUsageAssets(ctx context.Context) ([]*ServiceUsageAsset, error) { services, err := p.ListAllAssetTypesByName(ctx, []string{ServiceUsageAssetType}) if err != nil { return nil, err } typeGenerator := func(assets []*ExtendedGcpAsset, projectId, projectName, orgId, orgName string) *ServiceUsageAsset { return &ServiceUsageAsset{ Services: assets, CloudAccount: &fetching.CloudAccountMetadata{ AccountId: projectId, AccountName: projectName, OrganisationId: orgId, OrganizationName: orgName, }, } } assets := getAssetsByProject[ServiceUsageAsset](services, p.log, typeGenerator) return assets, nil } func (p *Provider) Close() error { return p.inventory.Close() } // enrichNetworkAssets enriches the network assets with dns policy if exists func (p *Provider) enrichNetworkAssets(ctx context.Context, assets []*ExtendedGcpAsset) { networkAssets := getAssetsByType(assets, ComputeNetworkAssetType) if len(networkAssets) == 0 { p.log.Infof("no %s assets were listed", ComputeNetworkAssetType) return } dnsPolicyAssets := p.getAllAssets(ctx, &assetpb.ListAssetsRequest{ Parent: p.config.Parent, AssetTypes: []string{DnsPolicyAssetType}, ContentType: assetpb.ContentType_RESOURCE, }) if len(dnsPolicyAssets) == 0 { p.log.Infof("no %s assets were listed, return original assets", DnsPolicyAssetType) return } dnsPolicies := decodeDnsPolicies(dnsPolicyAssets) p.log.Infof("attempting to enrich %d %s assets with dns policy", len(assets), ComputeNetworkAssetType) for _, networkAsset := range networkAssets { networkAssetFields := networkAsset.GetResource().GetData().GetFields() networkIdentifier := strings.TrimPrefix(networkAsset.GetName(), "//compute.googleapis.com") dnsPolicy := findDnsPolicyByNetwork(dnsPolicies, networkIdentifier) if dnsPolicy != nil { p.log.Infof("enrich a %s asset with dns policy, name: %s", ComputeNetworkAssetType, networkIdentifier) networkAssetFields["enabledDnsLogging"] = &structpb.Value{Kind: &structpb.Value_BoolValue{BoolValue: dnsPolicy.enableLogging}} } } } // findDnsPolicyByNetwork finds DNS policy by network identifier func findDnsPolicyByNetwork(dnsPolicies []*dnsPolicyFields, networkIdentifier string) *dnsPolicyFields { for _, dnsPolicy := range dnsPolicies { if lo.SomeBy(dnsPolicy.networks, func(networkUrl string) bool { return strings.HasSuffix(networkUrl, networkIdentifier) }) { return dnsPolicy } } return nil } // decodeDnsPolicies gets the required fields from the dns policies assets func decodeDnsPolicies(dnsPolicyAssets []*assetpb.Asset) []*dnsPolicyFields { dnsPolicies := make([]*dnsPolicyFields, 0) for _, dnsPolicyAsset := range dnsPolicyAssets { fields := new(dnsPolicyFields) dnsPolicyData := dnsPolicyAsset.GetResource().GetData().GetFields() if attachedNetworks, exist := dnsPolicyData["networks"]; exist { networks := attachedNetworks.GetListValue().GetValues() for _, network := range networks { if networkUrl, found := network.GetStructValue().GetFields()["networkUrl"]; found { fields.networks = append(fields.networks, networkUrl.GetStringValue()) } } } if enableLogging, exist := dnsPolicyData["enableLogging"]; exist { fields.enableLogging = enableLogging.GetBoolValue() } dnsPolicies = append(dnsPolicies, fields) } return dnsPolicies } // getAssetsByProject groups assets by project, extracts metadata for each project, and adds folder and organization-level resources for each group. func getAssetsByProject[T any](assets []*ExtendedGcpAsset, log *clog.Logger, f TypeGenerator[T]) []*T { assetsByProject := lo.GroupBy(assets, func(asset *ExtendedGcpAsset) string { return asset.CloudAccount.AccountId }) enrichedAssets := make([]*T, 0, len(assetsByProject)) for projectId, projectAssets := range assetsByProject { if projectId == "" { continue } if len(projectAssets) == 0 { log.Errorf("no assets were listed for project: %s", projectId) continue } cloudAccount := projectAssets[0].CloudAccount // add folder and org level log sinks for each project projectAssets = append(projectAssets, assetsByProject[""]...) enrichedAssets = append(enrichedAssets, f( projectAssets, projectId, cloudAccount.AccountName, cloudAccount.OrganisationId, cloudAccount.OrganizationName, )) } return enrichedAssets } func (p *Provider) getAllAssets(ctx context.Context, request *assetpb.ListAssetsRequest) []*assetpb.Asset { p.log.Infof("Listing asset types: %v of type %v for %v", request.AssetTypes, request.ContentType, request.Parent) results := make([]*assetpb.Asset, 0) it := p.inventory.ListAssets(ctx, request) for { response, err := it.Next() if err == iterator.Done { break } if err != nil { p.log.Errorf("Error fetching GCP Asset: %s", err) return results } p.log.Debugf("Fetched GCP Asset: %+v", response.Name) results = append(results, response) } return results } func mergeAssetContentType(assets []*assetpb.Asset) []*assetpb.Asset { resultsMap := make(map[string]*assetpb.Asset) for _, asset := range assets { assetKey := asset.Name if _, ok := resultsMap[assetKey]; !ok { resultsMap[assetKey] = asset continue } item := resultsMap[assetKey] if asset.Resource != nil { item.Resource = asset.Resource } if asset.IamPolicy != nil { item.IamPolicy = asset.IamPolicy } } results := make([]*assetpb.Asset, 0, len(resultsMap)) for _, asset := range resultsMap { results = append(results, asset) } return results } // extends the assets with the project and organization display name func (p *Provider) extendWithCloudMetadata(ctx context.Context, assets []*assetpb.Asset) []*ExtendedGcpAsset { extendedAssets := make([]*ExtendedGcpAsset, 0, len(assets)) for _, asset := range assets { orgId := getOrganizationId(asset.Ancestors) projectId := getProjectId(asset.Ancestors) cacheKey := fmt.Sprintf("%s/%s", projectId, orgId) cloudAccount := p.cloudAccountMetadataCache.Get(func() *fetching.CloudAccountMetadata { return p.getCloudAccountMetadata(ctx, projectId, orgId) }, cacheKey) extendedAssets = append(extendedAssets, &ExtendedGcpAsset{ Asset: asset, CloudAccount: cloudAccount, }) } return extendedAssets } func (p *Provider) ListProjectsAncestorsPolicies(ctx context.Context) ([]*ProjectPoliciesAsset, error) { projects := p.getAllAssets(ctx, &assetpb.ListAssetsRequest{ ContentType: assetpb.ContentType_IAM_POLICY, Parent: p.config.Parent, AssetTypes: []string{CrmProjectAssetType}, }) p.log.Infof("Listed %d GCP projects", len(projects)) ancestorsPoliciesCache := NewMapCache[[]*ExtendedGcpAsset]() return lo.Map(projects, func(project *assetpb.Asset, _ int) *ProjectPoliciesAsset { projectAsset := p.extendWithCloudMetadata(ctx, []*assetpb.Asset{project})[0] // Skip first ancestor it as we already got it policiesAssets := append([]*ExtendedGcpAsset{projectAsset}, getAncestorsAssets(ctx, ancestorsPoliciesCache, p, project.Ancestors[1:])...) return &ProjectPoliciesAsset{CloudAccount: projectAsset.CloudAccount, Policies: policiesAssets} }), nil } func getAncestorsAssets(ctx context.Context, ancestorsPoliciesCache *MapCache[[]*ExtendedGcpAsset], p *Provider, ancestors []string) []*ExtendedGcpAsset { return lo.Flatten(lo.Map(ancestors, func(parent string, _ int) []*ExtendedGcpAsset { return ancestorsPoliciesCache.Get(func() []*ExtendedGcpAsset { var assetType string if isFolder(parent) { assetType = CrmFolderAssetType } if isOrganization(parent) { assetType = CrmOrgAssetType } return p.extendWithCloudMetadata(ctx, p.getAllAssets(ctx, &assetpb.ListAssetsRequest{ ContentType: assetpb.ContentType_IAM_POLICY, Parent: parent, AssetTypes: []string{assetType}, })) }, parent) })) } func (p *Provider) getCloudAccountMetadata(ctx context.Context, projectId string, orgId string) *fetching.CloudAccountMetadata { var orgName string var projectName string wg := sync.WaitGroup{} wg.Add(1) go func() { if isOrganization(p.config.Parent) { orgName = p.crm.getOrganizationDisplayName(ctx, fmt.Sprintf("organizations/%s", orgId)) } wg.Done() }() wg.Add(1) go func() { // some assets are not associated with a project if projectId != "" { projectName = p.crm.getProjectDisplayName(ctx, fmt.Sprintf("projects/%s", projectId)) } wg.Done() }() wg.Wait() return &fetching.CloudAccountMetadata{ AccountId: projectId, AccountName: projectName, OrganisationId: orgId, OrganizationName: orgName, } } func getOrganizationId(ancestors []string) string { last := ancestors[len(ancestors)-1] parts := strings.Split(last, "/") // organizations/1234567890 if parts[0] == "organizations" { return parts[1] } return "" } func getProjectId(ancestors []string) string { parts := strings.Split(ancestors[0], "/") // projects/1234567890 if parts[0] == "projects" { return parts[1] } return "" } func getAssetsByType(projectAssets []*ExtendedGcpAsset, assetType string) []*ExtendedGcpAsset { return lo.Filter(projectAssets, func(asset *ExtendedGcpAsset, _ int) bool { return asset.AssetType == assetType }) } func isFolder(parent string) bool { return strings.HasPrefix(parent, "folders") } func isOrganization(parent string) bool { return strings.HasPrefix(parent, "organizations") }