in internal/inventory/inventory.go [76:112]
func (a *AssetInventory) Run(ctx context.Context) {
a.runAllFetchersOnce(ctx)
assetsBuffer := make([]AssetEvent, 0, a.bufferMaxSize)
flushTicker := time.NewTicker(a.bufferFlushInterval)
fetcherPeriod := time.NewTicker(a.period)
for {
select {
case <-ctx.Done():
a.logger.Warnf("Asset Inventory context is done: %v", ctx.Err())
a.publish(assetsBuffer)
return
case <-fetcherPeriod.C:
a.runAllFetchersOnce(ctx)
case <-flushTicker.C:
if len(assetsBuffer) == 0 {
a.logger.Debugf("Interval reached without events")
continue
}
a.logger.Infof("Asset Inventory buffer is being flushed (assets %d)", len(assetsBuffer))
a.publish(assetsBuffer)
assetsBuffer = assetsBuffer[:0] // clear keeping cap
case assetToPublish := <-a.assetCh:
assetsBuffer = append(assetsBuffer, assetToPublish)
if len(assetsBuffer) == a.bufferMaxSize {
a.logger.Infof("Asset Inventory buffer is being flushed (assets %d)", len(assetsBuffer))
a.publish(assetsBuffer)
assetsBuffer = assetsBuffer[:0] // clear keeping cap
}
}
}
}