x-pack/filebeat/input/entityanalytics/provider/activedirectory/statestore.go (150 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package activedirectory import ( "encoding/json" "errors" "fmt" "time" "github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/internal/kvstore" "github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/provider/activedirectory/internal/activedirectory" ) var ( usersBucket = []byte("users") stateBucket = []byte("state") whenChangedKey = []byte("when_changed") lastSyncKey = []byte("last_sync") lastUpdateKey = []byte("last_update") ) //go:generate stringer -type State //go:generate go-licenser -license Elastic type State int const ( Discovered State = iota + 1 Modified Deleted ) type User struct { activedirectory.Entry `json:"activedirectory"` State State `json:"state"` } // stateStore wraps a kvstore.Transaction and provides convenience methods for // accessing and store relevant data within the kvstore database. type stateStore struct { tx *kvstore.Transaction // whenChanged is the last whenChanged time in the set of // users and their associated groups. whenChanged time.Time // lastSync and lastUpdate are the times of the first update // or sync operation of users/groups. lastSync time.Time lastUpdate time.Time users map[string]*User } // newStateStore creates a new instance of stateStore. It will open a new write // transaction on the kvstore and load values from the database. Since this // opens a write transaction, only one instance of stateStore may be created // at a time. The close function must be called to release the transaction lock // on the kvstore database. func newStateStore(store *kvstore.Store) (*stateStore, error) { tx, err := store.BeginTx(true) if err != nil { return nil, fmt.Errorf("unable to open state store transaction: %w", err) } s := stateStore{ users: make(map[string]*User), tx: tx, } err = s.tx.Get(stateBucket, lastSyncKey, &s.lastSync) if err != nil && !errIsItemNotFound(err) { return nil, fmt.Errorf("unable to get last sync time from state: %w", err) } err = s.tx.Get(stateBucket, lastUpdateKey, &s.lastUpdate) if err != nil && !errIsItemNotFound(err) { return nil, fmt.Errorf("unable to get last update time from state: %w", err) } err = s.tx.Get(stateBucket, whenChangedKey, &s.whenChanged) if err != nil && !errIsItemNotFound(err) { return nil, fmt.Errorf("unable to get last change time from state: %w", err) } err = s.tx.ForEach(usersBucket, func(key, value []byte) error { var u User err = json.Unmarshal(value, &u) if err != nil { return fmt.Errorf("unable to unmarshal user from state: %w", err) } s.users[u.ID] = &u return nil }) if err != nil && !errIsItemNotFound(err) { return nil, fmt.Errorf("unable to get users from state: %w", err) } return &s, nil } // storeUser stores a user. If the user does not exist in the store, then the // user will be marked as discovered. Otherwise, the user will be marked // as modified. func (s *stateStore) storeUser(u activedirectory.Entry) *User { su := User{Entry: u} if existing, ok := s.users[u.ID]; ok { su.State = Modified *existing = su } else { su.State = Discovered s.users[u.ID] = &su } return &su } // len returns the number of user entries in the state store. func (s *stateStore) len() int { return len(s.users) } // forEach iterates over all users in the state store. Changes to the // User's fields will be reflected in the state store. func (s *stateStore) forEach(fn func(*User)) { for _, u := range s.users { fn(u) } } // close will close out the stateStore. If commit is true, the staged values on the // stateStore will be set in the kvstore database, and the transaction will be // committed. Otherwise, all changes will be discarded and the transaction will // be rolled back. The stateStore must NOT be used after close is called, rather, // a new stateStore should be created. func (s *stateStore) close(commit bool) (err error) { if !commit { return s.tx.Rollback() } // Fallback in case one of the statements below fails. If everything is // successful and Commit is called, then this call to Rollback will be a no-op. defer func() { if err == nil { return } rollbackErr := s.tx.Rollback() if rollbackErr == nil { err = fmt.Errorf("multiple errors during statestore close: %w", errors.Join(err, rollbackErr)) } }() if !s.lastSync.IsZero() { err = s.tx.Set(stateBucket, lastSyncKey, &s.lastSync) if err != nil { return fmt.Errorf("unable to save last sync time to state: %w", err) } } if !s.lastUpdate.IsZero() { err = s.tx.Set(stateBucket, lastUpdateKey, &s.lastUpdate) if err != nil { return fmt.Errorf("unable to save last update time to state: %w", err) } } if !s.whenChanged.IsZero() { err = s.tx.Set(stateBucket, whenChangedKey, &s.whenChanged) if err != nil { return fmt.Errorf("unable to save last change time to state: %w", err) } } for key, value := range s.users { if value.State == Deleted { err = s.tx.Delete(usersBucket, []byte(key)) if err != nil { return fmt.Errorf("unable to delete user %q from state: %w", key, err) } continue } err = s.tx.Set(usersBucket, []byte(key), value) if err != nil { return fmt.Errorf("unable to save user %q to state: %w", key, err) } } return s.tx.Commit() } // getLastSync retrieves the last full synchronization time from the kvstore // database. If the value doesn't exist, a zero time.Time is returned. func getLastSync(store *kvstore.Store) (time.Time, error) { var t time.Time err := store.RunTransaction(false, func(tx *kvstore.Transaction) error { return tx.Get(stateBucket, lastSyncKey, &t) }) return t, err } // getLastUpdate retrieves the last incremental update time from the kvstore // database. If the value doesn't exist, a zero time.Time is returned. func getLastUpdate(store *kvstore.Store) (time.Time, error) { var t time.Time err := store.RunTransaction(false, func(tx *kvstore.Transaction) error { return tx.Get(stateBucket, lastUpdateKey, &t) }) return t, err } // errIsItemNotFound returns true if the error represents an item not found // error (bucket not found or key not found). func errIsItemNotFound(err error) bool { return errors.Is(err, kvstore.ErrBucketNotFound) || errors.Is(err, kvstore.ErrKeyNotFound) }