x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go (221 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package pipelinemanager import ( "context" "encoding/binary" "fmt" "io" "os" "path/filepath" "sync" "github.com/mitchellh/hashstructure" "github.com/elastic/beats/v7/x-pack/dockerlogbeat/pipereader" "github.com/elastic/elastic-agent-libs/config" "github.com/docker/docker/api/types/plugins/logdriver" "github.com/docker/docker/daemon/logger" "github.com/docker/docker/daemon/logger/jsonfilelog" protoio "github.com/gogo/protobuf/io" "github.com/elastic/beats/v7/libbeat/publisher/pipeline" "github.com/elastic/elastic-agent-libs/logp" ) // containerConfig is the config.C unpacking type type containerConfig struct { Pipeline pipeline.Config `config:"pipeline"` Output config.Namespace `config:"output"` } // Pipeline represents a single pipeline and the count of associated clients type Pipeline struct { pipeline *pipeline.Pipeline refCount int } // PipelineManager is a handler into the map of pipelines used by the plugin type PipelineManager struct { mu sync.Mutex Logger *logp.Logger // pipelines key: config hash pipelines map[uint64]*Pipeline // clients config: filepath clients map[string]*ClientLogger // Client Logger key: container hash clientLogger map[string]logger.Logger // logDirectory is the bindmount for local container logsd logDirectory string // destroyLogsOnStop indicates for the client to remove log files when a container stops destroyLogsOnStop bool // hostname of the docker host hostname string } // NewPipelineManager creates a new Pipeline map func NewPipelineManager(logDestroy bool, hostname string) *PipelineManager { return &PipelineManager{ Logger: logp.NewLogger("PipelineManager"), pipelines: make(map[uint64]*Pipeline), clients: make(map[string]*ClientLogger), clientLogger: make(map[string]logger.Logger), logDirectory: "/var/log/docker/containers", destroyLogsOnStop: logDestroy, hostname: hostname, } } // CloseClientWithFile closes the client with the associated file func (pm *PipelineManager) CloseClientWithFile(file string) error { cl, err := pm.removeClient(file) if err != nil { return fmt.Errorf("Error removing client: %w", err) } hash := cl.pipelineHash // remove the logger pm.removeLogger(cl.ContainerMeta) pm.Logger.Debugf("Closing Client first from pipelineManager") err = cl.Close() if err != nil { return fmt.Errorf("error closing client: %w", err) } // if the pipeline is no longer in use, clean up pm.removePipelineIfNeeded(hash) return nil } // CreateClientWithConfig gets the pipeline linked to the given config, and creates a client // If no pipeline for that config exists, it creates one. func (pm *PipelineManager) CreateClientWithConfig(containerConfig ContainerOutputConfig, info logger.Info, file string) (*ClientLogger, error) { hashstring, err := hashstructure.Hash(containerConfig, nil) if err != nil { return nil, fmt.Errorf("error creating config hash: %w", err) } pipeline, err := pm.getOrCreatePipeline(containerConfig, hashstring) if err != nil { return nil, fmt.Errorf("error getting pipeline: %w", err) } reader, err := pipereader.NewReaderFromPath(file) if err != nil { return nil, fmt.Errorf("error creating reader for docker log stream: %w", err) } // Why is this empty by default? What should be here? Who knows! if info.LogPath == "" { info.LogPath = filepath.Join(pm.logDirectory, info.ContainerID, fmt.Sprintf("%s-json.log", info.ContainerID)) } err = os.MkdirAll(filepath.Dir(info.LogPath), 0755) if err != nil { return nil, fmt.Errorf("error creating directory for local logs: %w", err) } // set a default log size if _, ok := info.Config["max-size"]; !ok { info.Config["max-size"] = "10M" } // set a default log count if _, ok := info.Config["max-file"]; !ok { info.Config["max-file"] = "5" } localLog, err := jsonfilelog.New(info) if err != nil { return nil, fmt.Errorf("error creating local log: %w", err) } //actually get to crafting the new client. cl, err := newClientFromPipeline(pipeline.pipeline, reader, hashstring, info, localLog, pm.hostname) if err != nil { return nil, fmt.Errorf("error creating client: %w", err) } pm.registerClient(cl, hashstring, file) pm.registerLogger(localLog, info) return cl, nil } // CreateReaderForContainer responds to docker logs requests to pull local logs from the json logger func (pm *PipelineManager) CreateReaderForContainer(info logger.Info, config logger.ReadConfig) (io.ReadCloser, error) { logObject, exists := pm.getLogger(info) if !exists { return nil, fmt.Errorf("Could not find logger for %s", info.ContainerID) } pipeReader, pipeWriter := io.Pipe() logReader, ok := logObject.(logger.LogReader) if !ok { return nil, fmt.Errorf("logger does not support reading") } go func() { ctx := context.Background() watcher := logReader.ReadLogs(ctx, config) enc := protoio.NewUint32DelimitedWriter(pipeWriter, binary.BigEndian) defer enc.Close() defer watcher.ConsumerGone() var rawLog logdriver.LogEntry for { select { case msg, ok := <-watcher.Msg: if !ok { pipeWriter.Close() return } rawLog.Line = msg.Line rawLog.Partial = msg.PLogMetaData != nil rawLog.TimeNano = msg.Timestamp.UnixNano() rawLog.Source = msg.Source if err := enc.WriteMsg(&rawLog); err != nil { pipeWriter.CloseWithError(err) return } case err := <-watcher.Err: pipeWriter.CloseWithError(err) return } } }() return pipeReader, nil } //=================== // Private methods // checkAndCreatePipeline performs the pipeline check and creation as one atomic operation // It will either return a new pipeline, or an existing one from the pipeline map func (pm *PipelineManager) getOrCreatePipeline(logOptsConfig ContainerOutputConfig, hash uint64) (*Pipeline, error) { pm.mu.Lock() defer pm.mu.Unlock() var pipeline *Pipeline var err error pipeline, test := pm.pipelines[hash] if !test { pipeline, err = loadNewPipeline(logOptsConfig, pm.hostname, pm.Logger) if err != nil { return nil, fmt.Errorf("error loading pipeline: %w", err) } pm.pipelines[hash] = pipeline } return pipeline, nil } // getClient gets a pipeline client based on a file handle func (pm *PipelineManager) getClient(file string) (*ClientLogger, bool) { pm.mu.Lock() defer pm.mu.Unlock() cli, exists := pm.clients[file] return cli, exists } func (pm *PipelineManager) getLogger(info logger.Info) (logger.Logger, bool) { pm.mu.Lock() defer pm.mu.Unlock() logger, exists := pm.clientLogger[info.ContainerID] return logger, exists } // removePipeline removes a pipeline from the manager if it's refcount is zero. func (pm *PipelineManager) removePipelineIfNeeded(hash uint64) { pm.mu.Lock() defer pm.mu.Unlock() //if the pipeline is no longer in use, clean up if pm.pipelines[hash].refCount < 1 { pipeline := pm.pipelines[hash].pipeline delete(pm.pipelines, hash) //pipelines must be closed after clients //Just do this here, since the caller doesn't know if we need to close the libbeat pipeline pm.Logger.Debugf("Pipeline closing from removePipelineIfNeeded") err := pipeline.Close() if err != nil { pm.Logger.Errorf("Error closing pipeline: %s", err) } } } // registerClient registers a new client with the manager. Up to the caller to actually close the libbeat client func (pm *PipelineManager) registerClient(cl *ClientLogger, hash uint64, clientFile string) { pm.mu.Lock() defer pm.mu.Unlock() pm.clients[clientFile] = cl pm.pipelines[hash].refCount++ } // registerLogger registers a local logger used for reading back logs func (pm *PipelineManager) registerLogger(log logger.Logger, info logger.Info) { pm.mu.Lock() defer pm.mu.Unlock() pm.clientLogger[info.ContainerID] = log } // removeLogger removes a logging instace func (pm *PipelineManager) removeLogger(info logger.Info) { pm.mu.Lock() defer pm.mu.Unlock() logger, exists := pm.clientLogger[info.ContainerID] if !exists { return } logger.Close() delete(pm.clientLogger, info.ContainerID) if pm.destroyLogsOnStop { pm.removeLogFile(info.ContainerID) } } // removeLogFile removes a log file for a given container. Disabled by default. func (pm *PipelineManager) removeLogFile(id string) error { toRemove := filepath.Join(pm.logDirectory, id) return os.Remove(toRemove) } // removeClient deregisters a client func (pm *PipelineManager) removeClient(file string) (*ClientLogger, error) { pm.mu.Lock() defer pm.mu.Unlock() cl, ok := pm.clients[file] if !ok { return nil, fmt.Errorf("No client for file %s", file) } // deincrement the ref count hash := cl.pipelineHash pm.pipelines[hash].refCount-- delete(pm.clients, file) return cl, nil }