func()

in internal/inventory/inventory.go [76:112]


func (a *AssetInventory) Run(ctx context.Context) {
	a.runAllFetchersOnce(ctx)

	assetsBuffer := make([]AssetEvent, 0, a.bufferMaxSize)
	flushTicker := time.NewTicker(a.bufferFlushInterval)
	fetcherPeriod := time.NewTicker(a.period)
	for {
		select {
		case <-ctx.Done():
			a.logger.Warnf("Asset Inventory context is done: %v", ctx.Err())
			a.publish(assetsBuffer)
			return

		case <-fetcherPeriod.C:
			a.runAllFetchersOnce(ctx)

		case <-flushTicker.C:
			if len(assetsBuffer) == 0 {
				a.logger.Debugf("Interval reached without events")
				continue
			}

			a.logger.Infof("Asset Inventory buffer is being flushed (assets %d)", len(assetsBuffer))
			a.publish(assetsBuffer)
			assetsBuffer = assetsBuffer[:0] // clear keeping cap

		case assetToPublish := <-a.assetCh:
			assetsBuffer = append(assetsBuffer, assetToPublish)

			if len(assetsBuffer) == a.bufferMaxSize {
				a.logger.Infof("Asset Inventory buffer is being flushed (assets %d)", len(assetsBuffer))
				a.publish(assetsBuffer)
				assetsBuffer = assetsBuffer[:0] // clear keeping cap
			}
		}
	}
}