internal/resources/fetching/manager/manager.go (104 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package manager
import (
"context"
"fmt"
"sync"
"time"
"github.com/elastic/cloudbeat/internal/infra/clog"
"github.com/elastic/cloudbeat/internal/resources/fetching/cycle"
"github.com/elastic/cloudbeat/internal/resources/fetching/registry"
)
type Manager struct {
log *clog.Logger
// Duration of a single fetcher
timeout time.Duration
// Duration between two consecutive cycles
interval time.Duration
fetcherRegistry registry.Registry
ctx context.Context //nolint:containedctx
cancel context.CancelFunc
}
func NewManager(ctx context.Context, log *clog.Logger, interval time.Duration, timeout time.Duration, fetchers registry.Registry) (*Manager, error) {
ctx, cancel := context.WithCancel(ctx)
return &Manager{
log: log,
timeout: timeout,
interval: interval,
fetcherRegistry: fetchers,
ctx: ctx,
cancel: cancel,
}, nil
}
// Run starts all configured fetchers to collect resources.
func (m *Manager) Run() {
go m.fetchAndSleep(m.ctx)
}
func (m *Manager) Stop() {
m.cancel()
m.fetcherRegistry.Stop()
}
func (m *Manager) fetchAndSleep(ctx context.Context) {
// set immediate exec for first time run
timer := time.NewTimer(0)
defer timer.Stop()
for {
select {
case <-ctx.Done():
m.log.Info("Fetchers manager canceled")
return
case <-timer.C:
// update the interval
timer.Reset(m.interval)
// this is blocking so the stop will not be called until all the fetchers are finished
// in case there is a blocking fetcher it will halt (til the m.timeout)
go m.fetchIteration(ctx)
}
}
}
// fetchIteration waits for all the registered fetchers and trigger them to fetch relevant resources.
// The function must not get called in parallel.
func (m *Manager) fetchIteration(ctx context.Context) {
m.fetcherRegistry.Update()
m.log.Infof("Manager triggered fetching for %d fetchers", len(m.fetcherRegistry.Keys()))
start := time.Now()
seq := time.Now().Unix()
m.log.Infof("Cycle %d has started", seq)
wg := &sync.WaitGroup{}
for _, key := range m.fetcherRegistry.Keys() {
wg.Add(1)
go func(k string) {
defer wg.Done()
err := m.fetchSingle(ctx, k, cycle.Metadata{Sequence: seq})
if err != nil {
m.log.Errorf("Error running fetcher for key %s: %v", k, err)
}
}(key)
}
wg.Wait()
m.log.Infof("Manager finished waiting and sending data after %d milliseconds", time.Since(start).Milliseconds())
m.log.Infof("Cycle %d resource fetching has ended", seq)
}
func (m *Manager) fetchSingle(ctx context.Context, k string, cycleMetadata cycle.Metadata) error {
if !m.fetcherRegistry.ShouldRun(k) {
return nil
}
ctx, cancel := context.WithTimeout(ctx, m.timeout)
defer cancel()
// The buffer is required to avoid go-routine leaks in a case a fetcher timed out
errCh := make(chan error, 1)
go func() {
defer close(errCh)
errCh <- m.fetchProtected(ctx, k, cycleMetadata)
}()
select {
case <-ctx.Done():
switch ctx.Err() {
case context.DeadlineExceeded:
return fmt.Errorf("fetcher %s reached a timeout after %v seconds", k, m.timeout.Seconds())
case context.Canceled:
return fmt.Errorf("fetcher %s %s", k, ctx.Err().Error())
default:
return fmt.Errorf("fetcher %s failed with an unknown error: %v", k, ctx.Err())
}
case err := <-errCh:
return err
}
}
// fetchProtected protect the fetching goroutine from getting panic
func (m *Manager) fetchProtected(ctx context.Context, k string, metadata cycle.Metadata) (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("fetcher %s recovered from panic: %v", k, r)
}
}()
return m.fetcherRegistry.Run(ctx, k, metadata)
}