internal/resources/fetching/fetchers/gcp/assets_fetcher.go (171 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package fetchers
import (
"context"
"fmt"
"strings"
"github.com/huandu/xstrings"
"google.golang.org/protobuf/types/known/structpb"
"github.com/elastic/cloudbeat/internal/infra/clog"
"github.com/elastic/cloudbeat/internal/resources/fetching"
"github.com/elastic/cloudbeat/internal/resources/fetching/cycle"
"github.com/elastic/cloudbeat/internal/resources/providers/gcplib/inventory"
)
type GcpAssetsFetcher struct {
log *clog.Logger
resourceCh chan fetching.ResourceInfo
provider inventory.ServiceAPI
}
type GcpAsset struct {
Type string
SubType string
ExtendedAsset *inventory.ExtendedGcpAsset `json:"asset,omitempty"`
}
// GcpAssetTypes https://cloud.google.com/asset-inventory/docs/supported-asset-types
// map of types to asset types.
// sub-type is derived from asset type by using the first and last segments of the asset type name
// example: gcp-cloudkms-crypto-key
var GcpAssetTypes = map[string][]string{
fetching.ProjectManagement: {
inventory.CrmProjectAssetType,
},
fetching.KeyManagement: {
inventory.ApiKeysKeyAssetType,
inventory.CloudKmsCryptoKeyAssetType,
},
fetching.CloudIdentity: {
inventory.IamServiceAccountAssetType,
inventory.IamServiceAccountKeyAssetType,
},
fetching.CloudDatabase: {
inventory.BigqueryDatasetAssetType,
inventory.BigqueryTableAssetType,
inventory.SqlDatabaseInstanceAssetType,
},
fetching.CloudStorage: {
inventory.StorageBucketAssetType,
inventory.LogBucketAssetType,
},
fetching.CloudCompute: {
inventory.ComputeInstanceAssetType,
inventory.ComputeFirewallAssetType,
inventory.ComputeDiskAssetType,
inventory.ComputeNetworkAssetType,
inventory.ComputeBackendServiceAssetType,
inventory.ComputeSubnetworkAssetType,
},
fetching.CloudDns: {
inventory.DnsManagedZoneAssetType,
},
fetching.DataProcessing: {
inventory.DataprocClusterAssetType,
},
}
func NewGcpAssetsFetcher(_ context.Context, log *clog.Logger, ch chan fetching.ResourceInfo, provider inventory.ServiceAPI) *GcpAssetsFetcher {
return &GcpAssetsFetcher{
log: log,
resourceCh: ch,
provider: provider,
}
}
func (f *GcpAssetsFetcher) Fetch(ctx context.Context, cycleMetadata cycle.Metadata) error {
f.log.Info("Starting GcpAssetsFetcher.Fetch")
for typeName, assetTypes := range GcpAssetTypes {
assets, err := f.provider.ListAllAssetTypesByName(ctx, assetTypes)
if err != nil {
f.log.Errorf("Failed to list assets for type %s: %s", typeName, err.Error())
continue
}
for _, asset := range assets {
select {
case <-ctx.Done():
f.log.Infof("GcpAssetsFetcher.Fetch context err: %s", ctx.Err().Error())
return nil
case f.resourceCh <- fetching.ResourceInfo{
CycleMetadata: cycleMetadata,
Resource: &GcpAsset{
Type: typeName,
SubType: getGcpSubType(asset.AssetType),
ExtendedAsset: asset,
},
}:
}
}
}
return nil
}
func (f *GcpAssetsFetcher) Stop() {
f.provider.Close()
}
func (r *GcpAsset) GetData() any {
return r.ExtendedAsset.Asset
}
func (r *GcpAsset) GetIds() []string {
return []string{r.ExtendedAsset.Name}
}
func (r *GcpAsset) GetMetadata() (fetching.ResourceMetadata, error) {
var region string
if r.ExtendedAsset.Resource != nil {
region = r.ExtendedAsset.Resource.Location
}
return fetching.ResourceMetadata{
ID: r.ExtendedAsset.Name,
Type: r.Type,
SubType: r.SubType,
Name: getAssetResourceName(r.ExtendedAsset),
Region: region,
CloudAccountMetadata: *r.ExtendedAsset.CloudAccount,
}, nil
}
func (r *GcpAsset) GetElasticCommonData() (map[string]any, error) {
m := map[string]any{}
if r.Type == fetching.CloudIdentity {
m["user.effective.id"] = r.ExtendedAsset.Name
m["user.effective.name"] = getAssetResourceName(r.ExtendedAsset)
}
if r.Type == fetching.CloudCompute && r.ExtendedAsset.AssetType == inventory.ComputeInstanceAssetType {
fields := getAssetDataFields(r.ExtendedAsset)
if fields == nil {
return m, nil
}
nameField, ok := fields["name"]
if ok {
if name := nameField.GetStringValue(); name != "" {
m["host.name"] = name
}
}
hostnameField, ok := fields["hostname"]
if ok {
if hostname := hostnameField.GetStringValue(); hostname != "" {
m["host.hostname"] = hostname
}
}
}
return m, nil
}
// try to retrieve the resource name from the asset data fields (name or displayName), in case it is not set
// get the last part of the asset name (https://cloud.google.com/apis/design/resource_names#resource_id)
func getAssetResourceName(asset *inventory.ExtendedGcpAsset) string {
fields := getAssetDataFields(asset)
if fields != nil {
if name, exist := fields["displayName"]; exist && name.GetStringValue() != "" {
return name.GetStringValue()
}
if name, exist := fields["name"]; exist && name.GetStringValue() != "" {
return name.GetStringValue()
}
}
parts := strings.Split(asset.Name, "/")
return parts[len(parts)-1]
}
func getGcpSubType(assetType string) string {
dotIndex := strings.Index(assetType, ".")
slashIndex := strings.Index(assetType, "/")
prefix := assetType[:dotIndex]
suffix := assetType[slashIndex+1:]
return strings.ToLower(fmt.Sprintf("gcp-%s-%s", prefix, xstrings.ToKebabCase(suffix)))
}
// getAssetDataFields tries to retrieve asset.resource.data fields if possible.
// Returns nil otherwise.
func getAssetDataFields(asset *inventory.ExtendedGcpAsset) map[string]*structpb.Value {
resource := asset.GetResource()
if resource == nil {
return nil
}
data := resource.GetData()
if data == nil {
return nil
}
return data.GetFields()
}