internal/pkg/agent/storage/store/state_store.go (255 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package store import ( "bytes" "context" "encoding/json" "fmt" "io" "reflect" "sync" "github.com/elastic/elastic-agent/internal/pkg/agent/storage" "github.com/elastic/elastic-agent/internal/pkg/fleetapi" "github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker" "github.com/elastic/elastic-agent/pkg/core/logger" ) // Version is the current StateStore version. If any breaking change is // introduced, it should be increased and a migration added. const Version = "1" type saver interface { Save(io.Reader) error } type saveLoader interface { saver Load() (io.ReadCloser, error) } // StateStore stores the agent state: // - the last fleet action (not all actions are stored, refer to Save for details) // - a queue of scheduled actions // - the ack token // // See each method documentation for details. type StateStore struct { log *logger.Logger store saveLoader dirty bool state state mx sync.RWMutex } type state struct { Version string `json:"version"` ActionSerializer actionSerializer `json:"action,omitempty"` AckToken string `json:"ack_token,omitempty"` Queue actionQueue `json:"action_queue,omitempty"` } // actionSerializer is JSON Marshaler/Unmarshaler for fleetapi.Action. type actionSerializer struct { json.Marshaler json.Unmarshaler Action fleetapi.Action } // actionQueue stores scheduled actions to be executed and the type is needed // to make it possible to marshal and unmarshal fleetapi.ScheduledActions. // The fleetapi package marshal/unmarshal fleetapi.Actions, therefore it does // not need to handle fleetapi.ScheduledAction separately. However, the store does, // therefore the need for this type to do so. type actionQueue []fleetapi.ScheduledAction // NewStateStoreWithMigration creates a new state store and migrates the old ones. func NewStateStoreWithMigration( ctx context.Context, log *logger.Logger, actionStorePath, stateStorePath string, storageOpts ...storage.EncryptedOptionFunc) (*StateStore, error) { stateDiskStore, err := storage.NewEncryptedDiskStore( ctx, stateStorePath, storageOpts...) if err != nil { return nil, fmt.Errorf( "could not create EncryptedDiskStore when creating StateStoreWithMigration: %w", err) } return newStateStoreWithMigration(log, actionStorePath, stateDiskStore) } func newStateStoreWithMigration( log *logger.Logger, actionStorePath string, stateStore storage.Storage) (*StateStore, error) { err := migrateActionStoreToStateStore(log, actionStorePath, stateStore) if err != nil { return nil, fmt.Errorf("failed migrating action store to YAML state store: %w", err) } err = migrateYAMLStateStoreToStateStoreV1(log, stateStore) if err != nil { return nil, fmt.Errorf("failed migrating YAML store JSON store: %w", err) } return NewStateStore(log, stateStore) } // NewStateStoreActionAcker creates a new state store backed action acker. func NewStateStoreActionAcker(acker acker.Acker, store *StateStore) *StateStoreActionAcker { return &StateStoreActionAcker{acker: acker, store: store} } // NewStateStore creates a new state store. func NewStateStore(log *logger.Logger, store saveLoader) (*StateStore, error) { // If the store exists we will read it, if an error is returned we log it // and return an empty store. reader, err := store.Load() if err != nil { log.Warnf("failed to load state store, returning empty contents: %v", err.Error()) return &StateStore{log: log, store: store}, nil } defer reader.Close() st, err := readState(reader) if err != nil { return nil, fmt.Errorf("could not parse store content: %w", err) } if st.Version != Version { return nil, fmt.Errorf( "invalid state store version, current version is %q loaded store verion is %q", Version, st.Version) } return &StateStore{ log: log, store: store, state: st, }, nil } // readState parsed the content from reader as JSON to state. // It's mostly to abstract the parsing of the data so different functions can // reuse this. func readState(reader io.ReadCloser) (state, error) { st := state{} data, err := io.ReadAll(reader) if err != nil { return state{}, fmt.Errorf("could not read store state: %w", err) } if len(data) == 0 { // empty file return state{Version: "1"}, nil } err = json.Unmarshal(data, &st) if err != nil { return state{}, fmt.Errorf("could not parse JSON: %w", err) } return st, nil } // SetAction sets the current action. It accepts ActionPolicyChange or // ActionUnenroll. Any other type will be silently discarded. func (s *StateStore) SetAction(a fleetapi.Action) { s.mx.Lock() defer s.mx.Unlock() // the reflect.ValueOf(v).IsNil() is required as the type of 'v' on switch // clause with multiple types will, in this case, preserve the original type. // See details on https://go.dev/ref/spec#Type_switches // Without using reflect accessing the concrete type stored in the interface // isn't possible and as a is of type fleetapi.Action and has a concrete // value, a is never nil, neither v is nil as it has the same type of a // on both clauses. if a == nil || reflect.ValueOf(a).IsNil() { s.log.Debugf("trying to set an nil '%T' action, ignoring the action", a) return } switch v := a.(type) { // If any new action type is added, don't forget to update the method's // description. case *fleetapi.ActionPolicyChange, *fleetapi.ActionUnenroll: // Only persist the action if the action is different. if s.state.ActionSerializer.Action != nil && s.state.ActionSerializer.Action.ID() == v.ID() { return } s.dirty = true s.state.ActionSerializer.Action = a default: s.log.Debugw("trying to set invalid action type on the state store, ignoring the action", "action.type", a.Type(), "action.id", a.ID()) } } // SetAckToken set ack token to the agent state func (s *StateStore) SetAckToken(ackToken string) { s.mx.Lock() defer s.mx.Unlock() if s.state.AckToken == ackToken { return } s.dirty = true s.state.AckToken = ackToken } // SetQueue sets the action_queue to agent state func (s *StateStore) SetQueue(q []fleetapi.ScheduledAction) { s.mx.Lock() defer s.mx.Unlock() s.state.Queue = q s.dirty = true } // Save saves the actions into the state store. If the action type is not // supported or if any error happens, it returns a non-nil error. func (s *StateStore) Save() (err error) { s.mx.Lock() defer s.mx.Unlock() defer func() { if err == nil { s.dirty = false } }() if !s.dirty { return nil } var reader io.Reader switch a := s.state.ActionSerializer.Action.(type) { case *fleetapi.ActionPolicyChange, *fleetapi.ActionUnenroll, nil: // ok default: return fmt.Errorf("incompatible type, expected ActionPolicyChange, "+ "ActionUnenroll or nil, but received %T", a) } reader, err = jsonToReader(&s.state) if err != nil { return err } if err := s.store.Save(reader); err != nil { return err } s.log.Debugf("save state on disk : %+v", s.state) return nil } // Queue returns a copy of the queue func (s *StateStore) Queue() []fleetapi.ScheduledAction { s.mx.RLock() defer s.mx.RUnlock() q := make([]fleetapi.ScheduledAction, len(s.state.Queue)) copy(q, s.state.Queue) return q } // Action the action to execute. See SetAction for the possible action types. func (s *StateStore) Action() fleetapi.Action { s.mx.RLock() defer s.mx.RUnlock() if s.state.ActionSerializer.Action == nil { return nil } return s.state.ActionSerializer.Action } // AckToken return the agent state persisted ack_token func (s *StateStore) AckToken() string { s.mx.RLock() defer s.mx.RUnlock() return s.state.AckToken } // StateStoreActionAcker wraps an existing acker and will set any acked event // in the state store. It's up to the state store to decide if we need to // persist the event for future replay or just discard the event. type StateStoreActionAcker struct { acker acker.Acker store *StateStore } // Ack acks the action using underlying acker. // After the action is acked it is stored in the StateStore. The StateStore // decides if the action needs to be persisted or not. func (a *StateStoreActionAcker) Ack(ctx context.Context, action fleetapi.Action) error { if err := a.acker.Ack(ctx, action); err != nil { return err } a.store.SetAction(action) return a.store.Save() } // Commit commits acks. func (a *StateStoreActionAcker) Commit(ctx context.Context) error { return a.acker.Commit(ctx) } func (as *actionSerializer) MarshalJSON() ([]byte, error) { return json.Marshal(as.Action) } func (as *actionSerializer) UnmarshalJSON(data []byte) error { var typeUnmarshaler struct { Type string `json:"type,omitempty" yaml:"type,omitempty"` } err := json.Unmarshal(data, &typeUnmarshaler) if err != nil { return err } as.Action = fleetapi.NewAction(typeUnmarshaler.Type) err = json.Unmarshal(data, &as.Action) if err != nil { return err } return nil } func (aq *actionQueue) UnmarshalJSON(data []byte) error { actions := fleetapi.Actions{} err := json.Unmarshal(data, &actions) if err != nil { return fmt.Errorf("actionQueue failed to unmarshal: %w", err) } var scheduledActions []fleetapi.ScheduledAction for _, a := range actions { sa, ok := a.(fleetapi.ScheduledAction) if !ok { return fmt.Errorf("actionQueue: action %s isn't a ScheduledAction, "+ "cannot unmarshal it to actionQueue", a.Type()) } scheduledActions = append(scheduledActions, sa) } *aq = scheduledActions return nil } func jsonToReader(in interface{}) (io.Reader, error) { data, err := json.Marshal(in) if err != nil { return nil, fmt.Errorf("could not marshal to JSON: %w", err) } return bytes.NewReader(data), nil }