libbeat/publisher/pipeline/pipeline.go (187 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// Package pipeline combines all publisher functionality (processors, queue,
// outputs) to create instances of complete publisher pipelines, beats can
// connect to publish events to.
package pipeline
import (
"fmt"
"time"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/processing"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/beats/v7/libbeat/publisher/queue/diskqueue"
"github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)
// Pipeline implementation providint all beats publisher functionality.
// The pipeline consists of clients, processors, a central queue, an output
// controller and the actual outputs.
// The queue implementing the queue.Queue interface is the most central entity
// to the pipeline, providing support for pushung, batching and pulling events.
// The pipeline adds different ACKing strategies and wait close support on top
// of the queue. For handling ACKs, the pipeline keeps track of filtered out events,
// to be ACKed to the client in correct order.
// The output controller configures a (potentially reloadable) set of load
// balanced output clients. Events will be pulled from the queue and pushed to
// the output clients using a shared work queue for the active outputs.Group.
// Processors in the pipeline are executed in the clients go-routine, before
// entering the queue. No filtering/processing will occur on the output side.
//
// For client connecting to this pipeline, the default PublishMode is
// OutputChooses.
type Pipeline struct {
beatInfo beat.Info
monitors Monitors
outputController *outputController
observer observer
// If waitCloseTimeout is positive, then the pipeline will wait up to the
// specified time when it is closed for pending events to be acknowledged.
waitCloseTimeout time.Duration
processors processing.Supporter
}
// Settings is used to pass additional settings to a newly created pipeline instance.
type Settings struct {
// WaitClose sets the maximum duration to block when clients or pipeline itself is closed.
// When and how WaitClose is applied depends on WaitCloseMode.
WaitClose time.Duration
WaitCloseMode WaitCloseMode
Processors processing.Supporter
InputQueueSize int
}
// WaitCloseMode enumerates the possible behaviors of WaitClose in a pipeline.
type WaitCloseMode uint8
const (
// NoWaitOnClose disable wait close in the pipeline. Clients can still
// selectively enable WaitClose when connecting to the pipeline.
NoWaitOnClose WaitCloseMode = iota
// WaitOnPipelineClose applies WaitClose to the pipeline itself, waiting for outputs
// to ACK any outstanding events. This is independent of Clients asking for
// ACK and/or WaitClose. Clients can still optionally configure WaitClose themselves.
WaitOnPipelineClose
)
// OutputReloader interface, that can be queried from an active publisher pipeline.
// The output reloader can be used to change the active output.
type OutputReloader interface {
Reload(
cfg *reload.ConfigWithMeta,
factory func(outputs.Observer, conf.Namespace) (outputs.Group, error),
) error
}
// New create a new Pipeline instance from a queue instance and a set of outputs.
// The new pipeline will take ownership of queue and outputs. On Close, the
// queue and outputs will be closed.
func New(
beat beat.Info,
monitors Monitors,
userQueueConfig conf.Namespace,
out outputs.Group,
settings Settings,
) (*Pipeline, error) {
if monitors.Logger == nil {
monitors.Logger = logp.NewLogger("publish")
}
p := &Pipeline{
beatInfo: beat,
monitors: monitors,
observer: nilObserver,
waitCloseTimeout: settings.WaitClose,
processors: settings.Processors,
}
if settings.WaitCloseMode == WaitOnPipelineClose && settings.WaitClose > 0 {
p.waitCloseTimeout = settings.WaitClose
}
if monitors.Metrics != nil {
p.observer = newMetricsObserver(monitors.Metrics)
}
// Convert the raw queue config to a parsed Settings object that will
// be used during queue creation. This lets us fail immediately on startup
// if there's a configuration problem.
queueType := defaultQueueType
if b := userQueueConfig.Name(); b != "" {
queueType = b
}
queueFactory, err := queueFactoryForUserConfig(queueType, userQueueConfig.Config())
if err != nil {
return nil, err
}
output, err := newOutputController(beat, monitors, p.observer, queueFactory, settings.InputQueueSize)
if err != nil {
return nil, err
}
p.outputController = output
p.outputController.Set(out)
return p, nil
}
// Close stops the pipeline, outputs and queue.
// If WaitClose with WaitOnPipelineClose mode is configured, Close will block
// for a duration of WaitClose, if there are still active events in the pipeline.
// Note: clients must be closed before calling Close.
func (p *Pipeline) Close() error {
log := p.monitors.Logger
log.Debug("close pipeline")
// Note: active clients are not closed / disconnected.
p.outputController.WaitClose(p.waitCloseTimeout)
p.observer.cleanup()
return nil
}
// Connect creates a new client with default settings.
func (p *Pipeline) Connect() (beat.Client, error) {
return p.ConnectWith(beat.ClientConfig{})
}
// ConnectWith create a new Client for publishing events to the pipeline.
// The client behavior on close and ACK handling can be configured by setting
// the appropriate fields in provided ClientConfig.
// If not set otherwise the default publish mode is OutputChooses.
//
// It is responsibility of the caller to close the client.
func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
var (
canDrop bool
eventFlags publisher.EventFlags
)
err := validateClientConfig(&cfg)
if err != nil {
return nil, err
}
switch cfg.PublishMode {
case beat.GuaranteedSend:
eventFlags = publisher.GuaranteedSend
case beat.DropIfFull:
canDrop = true
}
waitClose := cfg.WaitClose
processors, err := p.createEventProcessing(cfg.Processing, publishDisabled)
if err != nil {
return nil, err
}
clientListener := cfg.ClientListener
if clientListener == nil {
clientListener = noopClientListener{}
}
client := &client{
logger: p.monitors.Logger,
clientListener: clientListener,
processors: processors,
eventFlags: eventFlags,
canDrop: canDrop,
observer: p.observer,
}
client.isOpen.Store(true)
ackHandler := cfg.EventListener
var waiter *clientCloseWaiter
if waitClose > 0 {
waiter = newClientCloseWaiter(waitClose)
if ackHandler == nil {
ackHandler = waiter
} else {
ackHandler = acker.Combine(waiter, ackHandler)
}
}
producerCfg := queue.ProducerConfig{
ACK: func(count int) {
client.observer.eventsACKed(count)
if ackHandler != nil {
ackHandler.ACKEvents(count)
}
},
}
if ackHandler == nil {
ackHandler = acker.Nil()
}
client.eventListener = ackHandler
client.waiter = waiter
client.producer = p.outputController.queueProducer(producerCfg)
if client.producer == nil {
// This can only happen if the pipeline was shut down while clients
// were still waiting to connect.
return nil, fmt.Errorf("client failed to connect because the pipeline is shutting down")
}
p.observer.clientConnected()
return client, nil
}
func (p *Pipeline) createEventProcessing(cfg beat.ProcessingConfig, noPublish bool) (beat.Processor, error) {
if p.processors == nil {
return nil, nil
}
return p.processors.Create(cfg, noPublish)
}
// OutputReloader returns a reloadable object for the output section of this pipeline
func (p *Pipeline) OutputReloader() OutputReloader {
return p.outputController
}
// Parses the given config and returns a QueueFactory based on it.
// This helper exists to frontload config parsing errors: if there is an
// error in the queue config, we want it to show up as fatal during
// initialization, even if the queue itself isn't created until later.
func queueFactoryForUserConfig(queueType string, userConfig *conf.C) (queue.QueueFactory, error) {
switch queueType {
case memqueue.QueueType:
settings, err := memqueue.SettingsForUserConfig(userConfig)
if err != nil {
return nil, err
}
return memqueue.FactoryForSettings(settings), nil
case diskqueue.QueueType:
settings, err := diskqueue.SettingsForUserConfig(userConfig)
if err != nil {
return nil, err
}
return diskqueue.FactoryForSettings(settings), nil
default:
return nil, fmt.Errorf("unrecognized queue type '%v'", queueType)
}
}
type noopClientListener struct{}
func (n noopClientListener) Closing() {}
func (n noopClientListener) Closed() {}
func (n noopClientListener) NewEvent() {}
func (n noopClientListener) Filtered() {}
func (n noopClientListener) Published() {}
func (n noopClientListener) DroppedOnPublish(beat.Event) {}