internal/inventory/inventory.go (132 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package inventory
import (
"context"
"fmt"
"strings"
"time"
"github.com/elastic/beats/v7/libbeat/beat"
libevents "github.com/elastic/beats/v7/libbeat/beat/events"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/samber/lo"
"github.com/elastic/cloudbeat/internal/infra/clog"
)
const (
indexTemplate = "logs-cloud_asset_inventory.asset_inventory-%s_%s-default"
minimalPeriod = 30 * time.Second
)
type AssetInventory struct {
fetchers []AssetFetcher
publisher AssetPublisher
bufferFlushInterval time.Duration
bufferMaxSize int
period time.Duration
logger *clog.Logger
assetCh chan AssetEvent
now func() time.Time
}
type AssetFetcher interface {
Fetch(ctx context.Context, assetChannel chan<- AssetEvent)
}
type AssetPublisher interface {
PublishAll([]beat.Event)
}
func NewAssetInventory(logger *clog.Logger, fetchers []AssetFetcher, publisher AssetPublisher, now func() time.Time, period time.Duration) AssetInventory {
if period < minimalPeriod {
period = minimalPeriod
}
logger.Infof("Initializing Asset Inventory POC with period of %s", period)
return AssetInventory{
logger: logger,
fetchers: fetchers,
publisher: publisher,
// move to a configuration parameter
bufferFlushInterval: 10 * time.Second,
bufferMaxSize: 1600,
period: period,
assetCh: make(chan AssetEvent),
now: now,
}
}
func (a *AssetInventory) Run(ctx context.Context) {
a.runAllFetchersOnce(ctx)
assetsBuffer := make([]AssetEvent, 0, a.bufferMaxSize)
flushTicker := time.NewTicker(a.bufferFlushInterval)
fetcherPeriod := time.NewTicker(a.period)
for {
select {
case <-ctx.Done():
a.logger.Warnf("Asset Inventory context is done: %v", ctx.Err())
a.publish(assetsBuffer)
return
case <-fetcherPeriod.C:
a.runAllFetchersOnce(ctx)
case <-flushTicker.C:
if len(assetsBuffer) == 0 {
a.logger.Debugf("Interval reached without events")
continue
}
a.logger.Infof("Asset Inventory buffer is being flushed (assets %d)", len(assetsBuffer))
a.publish(assetsBuffer)
assetsBuffer = assetsBuffer[:0] // clear keeping cap
case assetToPublish := <-a.assetCh:
assetsBuffer = append(assetsBuffer, assetToPublish)
if len(assetsBuffer) == a.bufferMaxSize {
a.logger.Infof("Asset Inventory buffer is being flushed (assets %d)", len(assetsBuffer))
a.publish(assetsBuffer)
assetsBuffer = assetsBuffer[:0] // clear keeping cap
}
}
}
}
// runAllFetchersOnce runs every fetcher to collect assets to assetCh ONCE. It
// should be called every cycle, once every `a.period`.
func (a *AssetInventory) runAllFetchersOnce(ctx context.Context) {
a.logger.Debug("Running all fetchers once")
for _, fetcher := range a.fetchers {
go func(fetcher AssetFetcher) {
fetcher.Fetch(ctx, a.assetCh)
}(fetcher)
}
}
func (a *AssetInventory) publish(assets []AssetEvent) {
events := lo.Map(assets, func(e AssetEvent, _ int) beat.Event {
var relatedEntity []string
relatedEntity = append(relatedEntity, e.Entity.Id)
if len(e.Entity.relatedEntityId) > 0 {
relatedEntity = append(relatedEntity, e.Entity.relatedEntityId...)
}
return beat.Event{
Meta: mapstr.M{libevents.FieldMetaIndex: generateIndex(e.Entity)},
Timestamp: a.now(),
Fields: mapstr.M{
"entity": e.Entity,
"event": e.Event,
"cloud": e.Cloud,
"host": e.Host,
"network": e.Network,
"user": e.User,
"Attributes": e.RawAttributes,
"labels": e.Labels,
"tags": e.Tags,
"organization": e.Organization,
"fass": e.Fass,
"url": e.URL,
"orchestrator": e.Orchestrator,
"container": e.Container,
"related.entity": relatedEntity,
},
}
})
a.publisher.PublishAll(events)
}
func generateIndex(a Entity) string {
return fmt.Sprintf(indexTemplate, slugfy(string(a.Category)), slugfy(string(a.Type)))
}
func slugfy(s string) string {
chunks := strings.Split(s, " ")
clean := make([]string, len(chunks))
for i, c := range chunks {
clean[i] = strings.ToLower(c)
}
return strings.Join(clean, "_")
}
func (a *AssetInventory) Stop() {
close(a.assetCh)
}