pkg/storage/events.go (104 lines of code) (raw):

// Copyright (c) Facebook, Inc. and its affiliates. // // This source code is licensed under the MIT license found in the // LICENSE file in the root directory of this source tree. package storage import ( "fmt" "time" "github.com/facebookincubator/contest/pkg/event" "github.com/facebookincubator/contest/pkg/event/frameworkevent" "github.com/facebookincubator/contest/pkg/event/testevent" "github.com/facebookincubator/contest/pkg/xcontext" ) type EventStorage interface { // Test events storage interface StoreTestEvent(ctx xcontext.Context, event testevent.Event) error GetTestEvents(ctx xcontext.Context, eventQuery *testevent.Query) ([]testevent.Event, error) // Framework events storage interface StoreFrameworkEvent(ctx xcontext.Context, event frameworkevent.Event) error GetFrameworkEvent(ctx xcontext.Context, eventQuery *frameworkevent.Query) ([]frameworkevent.Event, error) } // TestEventEmitter implements Emitter interface from the testevent package type TestEventEmitter struct { header testevent.Header // allowedEvents restricts the events this emitter will accept, if set allowedEvents *map[event.Name]bool } // TestEventFetcher implements the Fetcher interface from the testevent package type TestEventFetcher struct { } // TestEventEmitterFetcher implements Emitter and Fetcher interface of the testevent package type TestEventEmitterFetcher struct { TestEventEmitter TestEventFetcher } // Emit emits an event using the selected storage layer func (e TestEventEmitter) Emit(ctx xcontext.Context, data testevent.Data) error { if e.allowedEvents != nil { if _, ok := (*e.allowedEvents)[data.EventName]; !ok { return fmt.Errorf("teststep %s is not allowed to emit unregistered event %s", e.header.TestName, data.EventName) } } event := testevent.Event{Header: &e.header, Data: &data, EmitTime: time.Now()} if err := storage.StoreTestEvent(ctx, event); err != nil { return fmt.Errorf("could not persist event data %v: %v", data, err) } return nil } // Fetch retrieves events based on QueryFields that are used to build a Query object for TestEvents func (ev TestEventFetcher) Fetch(ctx xcontext.Context, queryFields ...testevent.QueryField) ([]testevent.Event, error) { eventQuery, err := testevent.QueryFields(queryFields).BuildQuery() if err != nil { return nil, fmt.Errorf("unable to build a query: %w", err) } if isStronglyConsistent(ctx) { return storage.GetTestEvents(ctx, eventQuery) } return storageAsync.GetTestEvents(ctx, eventQuery) } // NewTestEventEmitter creates a new Emitter object associated with a Header func NewTestEventEmitter(header testevent.Header) testevent.Emitter { return TestEventEmitter{header: header} } // NewTestEventEmitterWithAllowedEvents creates a new Emitter object associated with a Header func NewTestEventEmitterWithAllowedEvents(header testevent.Header, allowedEvents *map[event.Name]bool) testevent.Emitter { return TestEventEmitter{header: header, allowedEvents: allowedEvents} } // NewTestEventFetcher creates a new Fetcher object associated with a Header func NewTestEventFetcher() testevent.Fetcher { return TestEventFetcher{} } // NewTestEventEmitterFetcher creates a new EmitterFetcher object associated with a Header func NewTestEventEmitterFetcher(header testevent.Header) testevent.EmitterFetcher { return TestEventEmitterFetcher{ TestEventEmitter{header: header}, TestEventFetcher{}, } } // NewTestEventEmitterFetcherWithAllowedEvents creates a new EmitterFetcher object associated with a Header func NewTestEventEmitterFetcherWithAllowedEvents(header testevent.Header, allowedEvents *map[event.Name]bool) testevent.EmitterFetcher { return TestEventEmitterFetcher{ TestEventEmitter{header: header}, TestEventFetcher{}, } } // FrameworkEventEmitter implements Emitter interface from the frameworkevent package type FrameworkEventEmitter struct { } // FrameworkEventFetcher implements the Fetcher interface from the frameworkevent package type FrameworkEventFetcher struct { } // FrameworkEventEmitterFetcher implements Emitter and Fetcher interface from the frameworkevent package type FrameworkEventEmitterFetcher struct { FrameworkEventEmitter FrameworkEventFetcher } // Emit emits an event using the selected storage engine func (ev FrameworkEventEmitter) Emit(ctx xcontext.Context, event frameworkevent.Event) error { if err := storage.StoreFrameworkEvent(ctx, event); err != nil { return fmt.Errorf("could not persist event %v: %v", event, err) } return nil } // Fetch retrieves events based on QueryFields that are used to build a Query object for FrameworkEvents func (ev FrameworkEventFetcher) Fetch(ctx xcontext.Context, queryFields ...frameworkevent.QueryField) ([]frameworkevent.Event, error) { eventQuery, err := frameworkevent.QueryFields(queryFields).BuildQuery() if err != nil { return nil, fmt.Errorf("unable to build a query: %w", err) } if isStronglyConsistent(ctx) { return storage.GetFrameworkEvent(ctx, eventQuery) } return storageAsync.GetFrameworkEvent(ctx, eventQuery) } // NewFrameworkEventEmitter creates a new Emitter object for framework events func NewFrameworkEventEmitter() FrameworkEventEmitter { return FrameworkEventEmitter{} } // NewFrameworkEventFetcher creates a new Fetcher object for framework events func NewFrameworkEventFetcher() FrameworkEventFetcher { return FrameworkEventFetcher{} } // NewFrameworkEventEmitterFetcher creates a new EmitterFetcher object for framework events func NewFrameworkEventEmitterFetcher() FrameworkEventEmitterFetcher { return FrameworkEventEmitterFetcher{ FrameworkEventEmitter{}, FrameworkEventFetcher{}, } }