filebeat/fileset/factory.go (145 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package fileset
import (
"fmt"
"sync"
"github.com/gofrs/uuid/v5"
"github.com/mitchellh/hashstructure"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
)
var moduleList = monitoring.NewUniqueList()
var moduleListMetricsOnce sync.Once
// RegisterMonitoringModules registers the modules list with the monitoring system.
func RegisterMonitoringModules(namespace string) {
moduleListMetricsOnce.Do(func() {
monitoring.NewFunc(monitoring.GetNamespace("state").GetRegistry(), namespace, moduleList.Report, monitoring.Report)
})
}
// Factory for modules
type Factory struct {
beatInfo beat.Info
pipelineLoaderFactory PipelineLoaderFactory
overwritePipelines bool
pipelineCallbackID uuid.UUID
inputFactory cfgfile.RunnerFactory
}
// Wrap an array of inputs and implements cfgfile.Runner interface
type inputsRunner struct {
id uint64
moduleRegistry *ModuleRegistry
inputs []cfgfile.Runner
pipelineLoaderFactory PipelineLoaderFactory
pipelineCallbackID uuid.UUID
overwritePipelines bool
log *logp.Logger
}
// NewFactory instantiates a new Factory
func NewFactory(
inputFactory cfgfile.RunnerFactory,
beatInfo beat.Info,
pipelineLoaderFactory PipelineLoaderFactory,
overwritePipelines bool,
) *Factory {
return &Factory{
inputFactory: inputFactory,
beatInfo: beatInfo,
pipelineLoaderFactory: pipelineLoaderFactory,
pipelineCallbackID: uuid.Nil,
overwritePipelines: overwritePipelines,
}
}
// Create creates a module based on a config
func (f *Factory) Create(p beat.PipelineConnector, c *conf.C) (cfgfile.Runner, error) {
m, pConfigs, err := f.createRegistry(c)
if err != nil {
return nil, fmt.Errorf("could not create module registry for filesets: %w", err)
}
// Hash module ID
var h map[string]interface{}
if err = c.Unpack(&h); err != nil {
return nil, fmt.Errorf("failed to unpack config: %w", err)
}
id, err := hashstructure.Hash(h, nil)
if err != nil {
return nil, err
}
inputs := make([]cfgfile.Runner, len(pConfigs))
for i, pConfig := range pConfigs {
inputs[i], err = f.inputFactory.Create(p, pConfig)
if err != nil {
return nil, fmt.Errorf("failed to create input: %w", err)
}
}
return &inputsRunner{
id: id,
moduleRegistry: m,
inputs: inputs,
pipelineLoaderFactory: f.pipelineLoaderFactory,
pipelineCallbackID: f.pipelineCallbackID,
overwritePipelines: f.overwritePipelines,
log: f.beatInfo.Logger.Named(logName),
}, nil
}
func (f *Factory) CheckConfig(c *conf.C) error {
_, pConfigs, err := f.createRegistry(c)
if err != nil {
return fmt.Errorf("could not create module registry for filesets: %w", err)
}
for _, pConfig := range pConfigs {
err = f.inputFactory.CheckConfig(pConfig)
if err != nil {
return fmt.Errorf("error checking input configuration: %w", err)
}
}
return nil
}
// createRegistry starts a registry for a set of filesets, it returns the registry and
// its input configurations
func (f *Factory) createRegistry(c *conf.C) (*ModuleRegistry, []*conf.C, error) {
m, err := NewModuleRegistry([]*conf.C{c}, f.beatInfo, false, FilesetOverrides{})
if err != nil {
return nil, nil, err
}
pConfigs, err := m.GetInputConfigs()
if err != nil {
return nil, nil, err
}
return m, pConfigs, err
}
func (p *inputsRunner) Start() {
// Load pipelines
if p.pipelineLoaderFactory != nil {
// Attempt to load pipelines instantly when starting or after reload.
// Thus, if ES was not available previously, it could be loaded this time.
// If the function below fails, it means that ES is not available
// at the moment, so the pipeline loader cannot be created.
// Registering a callback regardless of the availability of ES
// makes it possible to try to load pipeline when ES becomes reachable.
pipelineLoader, err := p.pipelineLoaderFactory()
if err != nil {
p.log.Errorf("Error loading pipeline: %s", err)
} else {
err := p.moduleRegistry.LoadPipelines(pipelineLoader, p.overwritePipelines)
if err != nil {
// Log error and continue
p.log.Errorf("Error loading pipeline: %s", err)
}
}
// Register callback to try to load pipelines when connecting to ES.
callback := func(esClient *eslegclient.Connection) error {
return p.moduleRegistry.LoadPipelines(esClient, p.overwritePipelines)
}
p.pipelineCallbackID, err = elasticsearch.RegisterConnectCallback(callback)
if err != nil {
p.log.Errorf("Error registering connect callback for Elasticsearch to load pipelines: %v", err)
}
}
for _, input := range p.inputs {
input.Start()
}
// Loop through and add modules
for _, module := range p.moduleRegistry.registry {
moduleList.Add(module.config.Module)
}
}
func (p *inputsRunner) Stop() {
if p.pipelineCallbackID != uuid.Nil {
elasticsearch.DeregisterConnectCallback(p.pipelineCallbackID)
}
for _, input := range p.inputs {
input.Stop()
}
// Loop through and remove modules
for _, module := range p.moduleRegistry.registry {
moduleList.Remove(module.config.Module)
}
}
func (p *inputsRunner) String() string {
return p.moduleRegistry.InfoString()
}