filebeat/input/filestream/internal/input-logfile/store.go (490 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package input_logfile import ( "fmt" "strings" "sync" "sync/atomic" "time" "github.com/elastic/beats/v7/libbeat/common/cleanup" "github.com/elastic/beats/v7/libbeat/common/transform/typeconv" "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/go-concert" "github.com/elastic/go-concert/unison" ) // sourceStore is a store which can access resources using the Source // from an input. type sourceStore struct { // identifier is the sourceIdentifier used to generate IDs fro this store. identifier *sourceIdentifier // identifiersToTakeOver are sourceIdentifier from previous input instances // that this sourceStore will take states over. identifiersToTakeOver []*sourceIdentifier // store is the underlying store that encapsulates // the in-memory and persistent store. store *store } // store encapsulates the persistent store and the in memory state store, that // can be ahead of the the persistent store. // The store lifetime is managed by a reference counter. Once all owners (the // session, and the resource cleaner) have dropped ownership, backing resources // will be released and closed. type store struct { log *logp.Logger refCount concert.RefCount persistentStore *statestore.Store ephemeralStore *states } // states stores resource states in memory. When a cursor for an input is updated, // it's state is updated first. The entry in the persistent store 'follows' the internal state. // As long as a resources stored in states is not 'Finished', the in memory // store is assumed to be ahead (in memory and persistent state are out of // sync). type states struct { mu sync.Mutex table map[string]*resource } // resource holds the in memory state and keeps track of pending updates and inputs collecting // event for the resource its key. // A resource is assumed active for as long as at least one input has (or tries // to) acuired the lock, and as long as there are pending updateOp instances in // the pipeline not ACKed yet. The key can not gc'ed by the cleaner, as long as the resource is active. // // State chagnes and writes to the persistent store are protected using the // stateMutex, to ensure full consistency between direct writes and updates // after ACK. type resource struct { // pending counts the number of Inputs and outstanding registry updates. // as long as pending is > 0 the resource is in used and must not be garbage collected. pending atomic.Uint64 // current identity version when updated stateMutex must be locked. // Pending updates will be discarded if it is increased. version, lockedVersion uint // lock guarantees only one input create updates for this entry lock unison.Mutex // key of the resource as used for the registry. key string // stateMutex is used to lock the resource when it is update/read from // multiple go-routines like the ACK handler or the input publishing an // event. // stateMutex is used to access the fields 'stored', 'state', 'internalInSync' and 'version'. stateMutex sync.Mutex // stored indicates that the state is available in the registry file. It is false for new entries. stored bool // invalid indicates if the resource has been marked for deletion, if yes, it cannot be overwritten // in the persistent state. invalid bool activeCursorOperations uint internalState stateInternal // cursor states. The cursor holds the state as it is currently known to the // persistent store, while pendingCursor contains the most recent update // (in-memory state), that still needs to be synced to the persistent store. // The pendingCursor is nil if there are no pending updates. // When processing update operations on ACKs, the state is applied to cursor // first, which is finally written to the persistent store. This ensures that // we always write the complete state of the key/value pair. cursor interface{} pendingCursorValue interface{} pendingUpdate interface{} // delta value of most recent pending updateOp cursorMeta interface{} } type ( // state represents the full document as it is stored in the registry. // // The TTL and Update fields are for internal use only. // // The `Cursor` namespace is used to store the cursor information that are // required to continue processing from the last known position. Cursor // updates in the registry file are only executed after events have been // ACKed by the outputs. Therefore the cursor MUST NOT include any // information that are require to identify/track the source we are // collecting from. state struct { TTL time.Duration Updated time.Time Cursor interface{} Meta interface{} } stateInternal struct { TTL time.Duration Updated time.Time } ) // hook into store close for testing purposes var closeStore = (*store).close func openStore(log *logp.Logger, statestore statestore.States, prefix string) (*store, error) { ok := false persistentStore, err := statestore.StoreFor("") if err != nil { return nil, err } defer cleanup.IfNot(&ok, func() { persistentStore.Close() }) states, err := readStates(log, persistentStore, prefix) if err != nil { return nil, err } ok = true return &store{ log: log, persistentStore: persistentStore, ephemeralStore: states, }, nil } // newSourceStore store returns a souceStore that will operate on the provided // store. identifier is required and is used to generate the ID for the // resources stored on store. identifiersToTakeOver is used by the TakeOver // method when taking over states from other Filestream inputs. // identifiersToTakeOver is optional and can be nil. func newSourceStore( s *store, identifier *sourceIdentifier, identifiersToTakeOver []*sourceIdentifier, ) *sourceStore { return &sourceStore{ store: s, identifier: identifier, identifiersToTakeOver: identifiersToTakeOver, } } func (s *sourceStore) FindCursorMeta(src Source, v interface{}) error { key := s.identifier.ID(src) return s.store.findCursorMeta(key, v) } func (s *sourceStore) UpdateMetadata(src Source, v interface{}) error { key := s.identifier.ID(src) return s.store.updateMetadata(key, v) } func (s *sourceStore) Remove(src Source) error { key := s.identifier.ID(src) return s.store.remove(key) } func (s *sourceStore) ResetCursor(src Source, cur interface{}) error { key := s.identifier.ID(src) return s.store.resetCursor(key, cur) } // CleanIf sets the TTL of a resource if the predicate return true. func (s *sourceStore) CleanIf(pred func(v Value) bool) { s.store.ephemeralStore.mu.Lock() defer s.store.ephemeralStore.mu.Unlock() for key, res := range s.store.ephemeralStore.table { if !s.identifier.MatchesInput(key) { continue } if !res.lock.TryLock() { continue } remove := pred(res) if remove { s.store.UpdateTTL(res, 0) } res.lock.Unlock() } } // UpdateIdentifiers copies an existing resource to a new ID and marks the previous one // for removal. func (s *sourceStore) UpdateIdentifiers(getNewID func(v Value) (string, any)) { s.store.ephemeralStore.mu.Lock() defer s.store.ephemeralStore.mu.Unlock() for key, res := range s.store.ephemeralStore.table { // Entries in the registry are soft deleted, once the gcStore runs, // they're actually removed from the in-memory registry (ephemeralStore) // and marked as removed in the registry operations log. So we need // to skip all entries that were soft deleted. if res.isDeleted() { continue } if !s.identifier.MatchesInput(key) { continue } if !res.lock.TryLock() { s.store.log.Infof("cannot lock '%s', will not update registry for it", key) continue } newKey, updatedMeta := getNewID(res) if len(newKey) > 0 { if _, ok := s.store.ephemeralStore.table[newKey]; ok { res.lock.Unlock() continue } r := res.copyWithNewKey(newKey) r.cursorMeta = updatedMeta r.stored = false // writeState only writes to the log file (disk) // the write is synchronous s.store.writeState(r) // Add the new resource to the ephemeralStore so the rest of the // codebase can have access to the new value s.store.ephemeralStore.table[newKey] = r // Remove the old key from the store aka delete. This is also // synchronously written to the disk. // We cannot use store.remove because it will // acquire the same lock we hold, causing a deadlock. // See store.remove for details. // Fully remove the old resource from all stores. // - 1. Update the TLL, which soft-deletes it. This is the // mechanism used by store.remove. We cannot call store.remove // because it will acquire a lock we're holding. // - 2. Remove the resource from the in-memory store // - 3. Finally, synchronously remove it from the disk store. s.store.UpdateTTL(res, 0) delete(s.store.ephemeralStore.table, res.key) s.store.persistentStore.Remove(res.key) s.store.log.Infof("migrated entry in registry from '%s' to '%s'. Cursor: %v", key, newKey, r.cursor) } res.lock.Unlock() } } // TakeOver allows one Filestream input to take over states from other // Filestream inputs or the Log input. fn should return the new registry ID // and new CursorMeta. If fn returns an empty string, the entry is skipped. // // When fn returns a valid ID, the old resource is removed from both, // the in-memory and persistent store. The operations are synchronous. // // If the resource migrated was from the Log input, `TakeOver` will // remove it from the persistent store, however the Log input reigstrar // will write it back when Filebeat is shutting down. However, // there is a mechanism in place to detect this situation and avoid // migrating the same state over and over again. // See the comments on this method for more details. func (s *sourceStore) TakeOver(fn func(Value) (string, any)) { matchPreviousFilestreamIDs := func(key string) bool { for _, identifier := range s.identifiersToTakeOver { if identifier.MatchesInput(key) { return true } } return false } // Iterate through the states from any Filestream input fromFilestreamInput := map[string]struct{}{} for key, res := range s.store.ephemeralStore.table { // Entries in the registry are soft deleted, once the gcStore runs, // they're actually removed from the in-memory registry (ephemeralStore) // and marked as removed in the registry operations log. So we need // to skip all entries that were soft deleted. if res.isDeleted() { continue } if !matchPreviousFilestreamIDs(key) { continue } fromFilestreamInput[key] = struct{}{} } // Iterate through the whole store, no matter input type or input ID. // That's the only way to access the log input states. // We only iterate through the whole store if we're not migrating from // a Filestream input fromLogInput := map[string]logInputState{} if len(s.identifiersToTakeOver) == 0 { s.store.persistentStore.Each(func(key string, value statestore.ValueDecoder) (bool, error) { if strings.HasPrefix(key, "filebeat::logs::") { m := mapstr.M{} if err := value.Decode(&m); err != nil { return true, err } st, err := logInputStateFromMapM(m) if err != nil { // Log the error and continue s.store.log.Errorf("cannot read Log input state: %s", err) return true, nil } // That is a workaround for the problems with the // Log input Registrar (`filebeat/registrar`) and the way it // handles states. // There are two problems: // - 1. The Log input store/registrar does not have an API for // removing states // - 2. When `registrar.Registrar` starts, it copies all states // belonging to the Log input from the disk store into // memory and when the Registrar is shutting down, it // writes all states to the disk. This all happens even // if no Log input was ever started. // This means that no matter what we do here, the states from // the Log input are always re-written to disk. // See: filebeat/registrar/registrar.go, deferred statement on // `Registrar.Run`. // // However, there is a "reset state" code, that runs // during the Registrar initialisation and sets the // TTL to -2, once the Log input havesting that file starts // the TTL is set to -1 (never expires) or the configured // value. // See: filebeat/registrar/registrar.go (readStatesFrom) and // filebeat/beater/filebeat.go (registrar.Start()) // // This means that while the Log input is running and the file // has been active at any moment during the Filebeat's execution // the TTL is never going to be -2 during the shutdown. // // So, if TTL == -2, then in the previous run of Filebeat, there // was no Log input using this state, which likely means, it is // a state that has already been migrated to Filestream. // // The worst case that can happen is that we re-ingest the file // once, which is still better than copying an old state with // an incorrect offset every time Filebeat starts. if st.TTL == -2 { return true, nil } st.key = key fromLogInput[key] = st } return true, nil }) } // Lock the ephemeral store so we can migrate the states in one go s.store.ephemeralStore.mu.Lock() defer s.store.ephemeralStore.mu.Unlock() // Migrate all states from the Filestream input for k := range fromFilestreamInput { res := s.store.ephemeralStore.unsafeFind(k, false) if res == nil { // The resource does not exist or has been deleted. // This should never happen, but better safe than sorry continue } if !res.lock.TryLock() { res.Release() s.store.log.Infof("cannot lock '%s', will not migrate its state", k) continue } newKey, updatedMeta := fn(res) if len(newKey) > 0 { // If the new key already exists in the store, do nothing. // Unlock the resource and return if res := s.store.ephemeralStore.unsafeFind(newKey, false); res != nil { res.Release() res.lock.Unlock() continue } r := res.copyWithNewKey(newKey) r.cursorMeta = updatedMeta r.stored = false // writeState only writes to the log file (disk) // the write is synchronous s.store.writeState(r) // Add the new resource to the ephemeralStore so the rest of the // codebase can have access to the new value s.store.ephemeralStore.table[newKey] = r // Remove the old key from the store aka delete. This is also // synchronously written to the disk. // We cannot use store.remove because it will // acquire the same lock we hold, causing a deadlock. // See store.remove for details. // Fully remove the old resource from all stores. // - 1. Update the TTL, which soft-deletes it. // - 2. Remove the resource from the in-memory store // - 3. Finally, synchronously remove it from the disk store. s.store.UpdateTTL(res, 0) delete(s.store.ephemeralStore.table, res.key) s.store.persistentStore.Remove(res.key) s.store.log.Infof("migrated entry in registry from '%s' to '%s'. Cursor: %v", k, newKey, r.cursor) } res.Release() res.lock.Unlock() } // Migrate all states from the Log input for k, v := range fromLogInput { newKey, updatedMeta := fn(v) // Find or create a resource. It should always create a new one. res := s.store.ephemeralStore.unsafeFind(newKey, true) res.cursorMeta = updatedMeta // Convert the offset to the correct type res.cursor = struct { Offset int64 `json:"offset" struct:"offset"` }{ Offset: v.Offset, } // Write to disk s.store.writeState(res) // Update in-memory store s.store.ephemeralStore.table[newKey] = res // "remove" from the disk store. // It will add a remove entry in the log file for this key, however // the Registrar used by the Log input will write to disk all states // it read when Filebeat was starting, thus "overriding" this delete. // We keep it here because when we remove the Log input we will ensure // the entry is actually remove from the disk store. s.store.persistentStore.Remove(k) res.Release() s.store.log.Infof("migrated entry in registry from '%s' to '%s'. Cursor: %v", k, newKey, res.cursor) } } type logInputState struct { ID string `json:"id"` Offset int64 `json:"offset"` TTL time.Duration `json:"ttl" struct:"ttl"` key string `json:"-"` // This matches the filestream.fileMeta struct // and are used by UnpackCursorMeta Source string `json:"source" struct:"source"` IdentifierName string `json:"identifier_name" struct:"identifier_name"` } func logInputStateFromMapM(m mapstr.M) (logInputState, error) { state := logInputState{} // typeconf.Convert kept failing with an "unsupported" error because // FileStateOS was present, we don't need it, so just delete it. m.Delete("FileStateOS") //nolint:errcheck // The key is always there if err := typeconv.Convert(&state, m); err != nil { return logInputState{}, fmt.Errorf("cannot convert Log input state: %w", err) } return state, nil } // UnpackCursorMeta unpacks the cursor metadata's into the provided struct. TBD func (l logInputState) UnpackCursorMeta(to any) error { return typeconv.Convert(to, l) } // Key returns the resource's key func (l logInputState) Key() string { return l.key } func (s *store) Retain() { s.refCount.Retain() } func (s *store) Release() { if s.refCount.Release() { closeStore(s) } } func (s *store) close() { if err := s.persistentStore.Close(); err != nil { s.log.Errorf("Closing registry store did report an error: %+v", err) } } // Get returns the resource for the key. // A new shared resource is generated if the key is not known. The generated // resource is not synced to disk yet. func (s *store) Get(key string) *resource { return s.ephemeralStore.Find(key, true) } func (s *store) findCursorMeta(key string, to interface{}) error { resource := s.ephemeralStore.Find(key, false) if resource == nil { return fmt.Errorf("resource '%s' not found", key) } return typeconv.Convert(to, resource.cursorMeta) } // updateMetadata updates the cursor metadata in the persistent store. func (s *store) updateMetadata(key string, meta interface{}) error { resource := s.ephemeralStore.Find(key, true) if resource == nil { return fmt.Errorf("resource '%s' not found", key) } resource.cursorMeta = meta s.writeState(resource) resource.Release() return nil } // writeState writes the state to the persistent store. // WARNING! it does not lock the store or the resource. func (s *store) writeState(r *resource) { if r.invalid { return } err := s.persistentStore.Set(r.key, r.inSyncStateSnapshot()) if err != nil { s.log.Errorf("Failed to update resource fields for '%v'", r.key) } else { r.stored = true } } // resetCursor sets the cursor to the value in cur in the persistent store and // drops all pending cursor operations. func (s *store) resetCursor(key string, cur interface{}) error { r := s.ephemeralStore.Find(key, false) if r == nil { return fmt.Errorf("resource '%s' not found", key) } defer r.Release() r.stateMutex.Lock() defer r.stateMutex.Unlock() r.version++ r.UpdatesReleaseN(r.activeCursorOperations) r.activeCursorOperations = 0 r.pendingCursorValue = nil r.pendingUpdate = nil typeconv.Convert(&r.cursor, cur) //nolint:errcheck // not changing behaviour on this commit s.writeState(r) return nil } // Removes marks an entry for removal by setting its TTL to zero. func (s *store) remove(key string) error { resource := s.ephemeralStore.Find(key, false) if resource == nil { return fmt.Errorf("resource '%s' not found", key) } s.UpdateTTL(resource, 0) resource.Release() return nil } // UpdateTTL updates the time-to-live of a resource. Inactive resources with expired TTL are subject to removal. // The TTL value is part of the internal state, and will be written immediately to the persistent store. // On update the resource its `cursor` state is used, to keep the cursor state in sync with the current known // on disk store state. // // If the TTL of the resource is set to 0, once it is persisted, it is going to be removed from the // store in the next cleaner run. The resource also gets invalidated to make sure new updates are not // saved to the registry. func (s *store) UpdateTTL(resource *resource, ttl time.Duration) { resource.stateMutex.Lock() defer resource.stateMutex.Unlock() if resource.stored && resource.internalState.TTL == ttl { return } resource.internalState.TTL = ttl if resource.internalState.Updated.IsZero() { resource.internalState.Updated = time.Now() } s.writeState(resource) if resource.isDeleted() { // version must be incremented to make sure existing resource // instances do not overwrite the removal of the entry resource.version++ // invalidate it after it has been persisted to make sure it cannot // be overwritten in the persistent store resource.invalid = true } } // Find returns the resource for a given key. If the key is unknown and create is set to false nil will be returned. // The resource returned by Find is marked as active. (*resource).Release must be called to mark the resource as inactive again. func (s *states) Find(key string, create bool) *resource { s.mu.Lock() defer s.mu.Unlock() return s.unsafeFind(key, create) } // unsafeFind DOES NOT LOCK THE STORE!!! Only call unsafeFind if you're // currently holding the lock from states.mu. // // unsafeFind returns the resource for a given key. If the key is unknown and // create is set to false nil will be returned. // The resource returned by unsafeFind is marked as active. (*resource).Release // must be called to mark the resource as inactive again. func (s *states) unsafeFind(key string, create bool) *resource { if resource := s.table[key]; resource != nil && !resource.isDeleted() { resource.Retain() return resource } if !create { return nil } // resource is owned by table(session) and input that uses the resource. resource := &resource{ stored: false, key: key, lock: unison.MakeMutex(), } // -1 means this resource will not be cleaned up due to a timeout. // The zero-value for internalState.TTL means this resource is // soft-deleted. resource.internalState.TTL = -1 s.table[key] = resource resource.Retain() return resource } // IsNew returns true if we have no state recorded for the current resource. func (r *resource) IsNew() bool { r.stateMutex.Lock() defer r.stateMutex.Unlock() return r.pendingCursorValue == nil && r.pendingUpdate == nil && r.cursor == nil } func (r *resource) isDeleted() bool { return !r.internalState.Updated.IsZero() && r.internalState.TTL == 0 } // Retain is used to indicate that 'resource' gets an additional 'owner'. // Owners of an resource can be active inputs or pending update operations // not yet written to disk. func (r *resource) Retain() { r.pending.Add(1) } // Release reduced the owner ship counter of the resource. func (r *resource) Release() { r.pending.Add(^uint64(0)) } // UpdatesReleaseN is used to release ownership of N pending update operations. func (r *resource) UpdatesReleaseN(n uint) { r.pending.Add(^uint64(n - 1)) } // Finished returns true if the resource is not in use and if there are no pending updates // that still need to be written to the registry. func (r *resource) Finished() bool { return r.pending.Load() == 0 } // UnpackCursor deserializes the in memory state. func (r *resource) UnpackCursor(to interface{}) error { r.stateMutex.Lock() defer r.stateMutex.Unlock() return typeconv.Convert(to, r.activeCursor()) } // UnpackCursorMeta unpacks the cursor metadata's into the provided struct. func (r *resource) UnpackCursorMeta(to interface{}) error { return typeconv.Convert(to, r.cursorMeta) } // Key returns the resource's key func (r *resource) Key() string { return r.key } // syncStateSnapshot returns the current insync state based on already ACKed update operations. func (r *resource) inSyncStateSnapshot() state { return state{ TTL: r.internalState.TTL, Updated: r.internalState.Updated, Cursor: r.cursor, Meta: r.cursorMeta, } } func (r *resource) copyInto(dst *resource) { r.stateMutex.Lock() defer r.stateMutex.Unlock() internalState := r.internalState // This is required to prevent the cleaner from removing the // entry from the registry immediately. // It still might be removed if the output is blocked for a long // time. If removed the whole file is resent to the output when found/updated. internalState.Updated = time.Now() dst.stored = r.stored dst.internalState = internalState dst.activeCursorOperations = r.activeCursorOperations dst.cursor = r.cursor dst.pendingCursorValue = nil dst.pendingUpdate = nil dst.cursorMeta = r.cursorMeta // dst.lock should not be overwritten here because it's supposed to be locked // before this function call and it's important to preserve the previous value. } func (r *resource) copyWithNewKey(key string) *resource { internalState := r.internalState // This is required to prevent the cleaner from removing the // entry from the registry immediately. // It still might be removed if the output is blocked for a long // time. If removed the whole file is resent to the output when found/updated. internalState.Updated = time.Now() return &resource{ key: key, stored: r.stored, internalState: internalState, activeCursorOperations: r.activeCursorOperations, cursor: r.cursor, pendingCursorValue: nil, pendingUpdate: nil, cursorMeta: r.cursorMeta, lock: unison.MakeMutex(), } } // pendingCursor returns the current published cursor state not yet ACKed. // // Note: The stateMutex must be locked when calling pendingCursor. // //nolint:errcheck // not changing behaviour on this commit func (r *resource) pendingCursor() interface{} { if r.pendingUpdate != nil { var tmp interface{} typeconv.Convert(&tmp, &r.cursor) typeconv.Convert(&tmp, r.pendingUpdate) r.pendingCursorValue = tmp r.pendingUpdate = nil } return r.pendingCursorValue } // activeCursor func (r *resource) activeCursor() interface{} { if r.activeCursorOperations != 0 { return r.pendingCursor() } return r.cursor } // stateSnapshot returns the current in memory state, that already contains state updates // not yet ACKed. func (r *resource) stateSnapshot() state { return state{ TTL: r.internalState.TTL, Updated: r.internalState.Updated, Cursor: r.activeCursor(), Meta: r.cursorMeta, } } func readStates(log *logp.Logger, store *statestore.Store, prefix string) (*states, error) { keyPrefix := prefix + "::" states := &states{ table: map[string]*resource{}, } err := store.Each(func(key string, dec statestore.ValueDecoder) (bool, error) { if !strings.HasPrefix(key, keyPrefix) { return true, nil } var st state if err := dec.Decode(&st); err != nil { log.Errorf("Failed to read regisry state for '%v', cursor state will be ignored. Error was: %+v", key, err) return true, nil } resource := &resource{ key: key, stored: true, lock: unison.MakeMutex(), internalState: stateInternal{ TTL: st.TTL, Updated: st.Updated, }, cursor: st.Cursor, cursorMeta: st.Meta, } states.table[resource.key] = resource return true, nil }) if err != nil { return nil, err } return states, nil }