filebeat/registrar/registrar.go (209 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package registrar import ( "fmt" "strings" "sync" "time" "github.com/elastic/beats/v7/filebeat/input/file" "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/beats/v7/libbeat/statestore/backend" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" ) type Registrar struct { log *logp.Logger // registrar event input and output Channel chan []file.State out successLogger bufferedStateUpdates int // shutdown handling done chan struct{} wg sync.WaitGroup // state storage states *file.States // Map with all file paths inside and the corresponding state store *statestore.Store // Store keeps states in memory and on disk flushTimeout time.Duration gcEnabled, gcRequired bool } type successLogger interface { Published(n int) bool } var ( statesUpdate = monitoring.NewInt(nil, "registrar.states.update") statesCleanup = monitoring.NewInt(nil, "registrar.states.cleanup") statesCurrent = monitoring.NewInt(nil, "registrar.states.current") registryWrites = monitoring.NewInt(nil, "registrar.writes.total") registryFails = monitoring.NewInt(nil, "registrar.writes.fail") registrySuccess = monitoring.NewInt(nil, "registrar.writes.success") ) const fileStatePrefix = "filebeat::logs::" // New creates a new Registrar instance, updating the registry file on // `file.State` updates. New fails if the file can not be opened or created. func New(stateStore statestore.States, out successLogger, flushTimeout time.Duration) (*Registrar, error) { store, err := stateStore.StoreFor("") if err != nil { return nil, err } r := &Registrar{ log: logp.NewLogger("registrar"), Channel: make(chan []file.State, 1), out: out, done: make(chan struct{}), wg: sync.WaitGroup{}, states: file.NewStates(), store: store, flushTimeout: flushTimeout, } return r, nil } // GetStates return the registrar states func (r *Registrar) GetStates() []file.State { return r.states.GetStates() } // loadStates fetches the previous reading state from the configure RegistryFile file // The default file is `registry` in the data path. func (r *Registrar) loadStates() error { states, err := readStatesFrom(r.store) if err != nil { return fmt.Errorf("can not load filebeat registry state: %w", err) } r.states.SetStates(states) r.log.Infof("States Loaded from registrar: %+v", len(states)) return nil } func (r *Registrar) Start() error { // Load the previous log file locations now, for use in input err := r.loadStates() if err != nil { return fmt.Errorf("error loading state: %w", err) } r.wg.Add(1) go func() { defer r.wg.Done() r.Run() }() return nil } // Stop stops the registry. It waits until Run function finished. func (r *Registrar) Stop() { r.log.Info("Stopping Registrar") defer r.log.Info("Registrar stopped") close(r.done) r.wg.Wait() } func (r *Registrar) Run() { r.log.Debug("Starting Registrar") defer r.log.Debug("Stopping Registrar") defer r.store.Close() defer func() { if err := writeStates(r.store, r.states.GetStates()); err != nil { r.log.Errorf("Error writing stopping registrar state to statestore: %v", err) } }() var ( timer *time.Timer flushC <-chan time.Time directIn chan []file.State collectIn chan []file.State ) if r.flushTimeout <= 0 { directIn = r.Channel } else { collectIn = r.Channel } for { select { case <-r.done: r.log.Info("Ending Registrar") return case states := <-directIn: // no flush timeout configured. Directly update registry r.onEvents(states) r.commitStateUpdates() case states := <-collectIn: // flush timeout configured. Only update internal state and track pending // updates to be written to registry. r.onEvents(states) r.gcStates() if flushC == nil && len(states) > 0 { timer = time.NewTimer(r.flushTimeout) flushC = timer.C } case <-flushC: timer.Stop() r.commitStateUpdates() flushC = nil timer = nil } } } func (r *Registrar) commitStateUpdates() { // First clean up states r.gcStates() states := r.states.GetStates() statesCurrent.Set(int64(len(states))) registryWrites.Inc() if err := writeStates(r.store, states); err != nil { r.log.Errorf("Error writing registrar state to statestore: %v", err) registryFails.Inc() } r.log.Debugf("Registry file updated. %d active states.", len(states)) registrySuccess.Inc() if r.out != nil { r.out.Published(r.bufferedStateUpdates) } r.bufferedStateUpdates = 0 } // onEvents processes events received from the publisher pipeline func (r *Registrar) onEvents(states []file.State) { r.processEventStates(states) r.bufferedStateUpdates += len(states) // check if we need to enable state cleanup if !r.gcEnabled { for i := range states { if states[i].TTL >= 0 || states[i].Finished { r.gcEnabled = true break } } } r.log.Debugf("Registrar state updates processed. Count: %v", len(states)) // new set of events received -> mark state registry ready for next // cleanup phase in case gc'able events are stored in the registry. r.gcRequired = r.gcEnabled } // gcStates runs a registry Cleanup. The method check if more event in the // registry can be gc'ed in the future. If no potential removable state is found, // the gcEnabled flag is set to false, indicating the current registrar state being // stable. New registry update events can re-enable state gc'ing. func (r *Registrar) gcStates() { if !r.gcRequired { return } beforeCount := r.states.Count() cleanedStates, pendingClean := r.states.CleanupWith(func(id string) { r.store.Remove(fileStatePrefix + id) //nolint:errcheck // TODO: report error }) statesCleanup.Add(int64(cleanedStates)) r.log.Debugf( "Registrar states cleaned up. Before: %d, After: %d, Pending: %d", beforeCount, beforeCount-cleanedStates, pendingClean) r.gcRequired = false r.gcEnabled = pendingClean > 0 } // processEventStates gets the states from the events and writes them to the registrar state func (r *Registrar) processEventStates(states []file.State) { r.log.Debugf("Processing %d events", len(states)) ts := time.Now() for i := range states { r.states.UpdateWithTs(states[i], ts) statesUpdate.Add(1) } } func readStatesFrom(store *statestore.Store) ([]file.State, error) { var states []file.State err := store.Each(func(key string, dec statestore.ValueDecoder) (bool, error) { if !strings.HasPrefix(key, fileStatePrefix) { return true, nil } // try to decode. Ignore faulty/incompatible values. var st file.State if err := dec.Decode(&st); err != nil { // XXX: Do we want to log here? In case we start to store other // state types in the registry, then this operation will likely fail // quite often, producing some false-positives in the logs... return true, nil //nolint:nilerr // Ignore per comment above } st.Id = key[len(fileStatePrefix):] states = append(states, st) return true, nil }) if err != nil { return nil, err } states = fixStates(states) states = resetStates(states) return states, nil } func writeStates(store backend.Store, states []file.State) error { for i := range states { key := fileStatePrefix + states[i].Id if err := store.Set(key, states[i]); err != nil { return err } } return nil }