pkg/storage/storage.go (75 lines of code) (raw):

// Copyright (c) Facebook, Inc. and its affiliates. // // This source code is licensed under the MIT license found in the // LICENSE file in the root directory of this source tree. package storage import ( "fmt" "github.com/facebookincubator/contest/pkg/config" "github.com/facebookincubator/contest/pkg/xcontext" ) // storage defines the storage engine used by ConTest. It can be overridden // via the exported function SetStorage. var storage Storage var storageAsync Storage // ConsistencyModel hints at whether queries should go to the primary database // or any available replica (in which case, the guarantee is eventual consistency) type ConsistencyModel int const ( ConsistentReadAfterWrite ConsistencyModel = iota ConsistentEventually ) const consistencyModelKey = "storage_consistency_model" // Storage defines the interface that storage engines must implement type Storage interface { JobStorage EventStorage // Close flushes and releases resources associated with the storage engine. Close() error // Version returns the version of the storage being used Version() (uint64, error) } // TransactionalStorage is implemented by storage backends that support transactions. // Only default isolation level is supported. type TransactionalStorage interface { Storage BeginTx() (TransactionalStorage, error) Commit() error Rollback() error } // ResettableStorage is implemented by storage engines that support reset operation type ResettableStorage interface { Storage Reset() error } // SetStorage sets the desired storage engine for events. Switching to a new // storage engine implies garbage collecting the old one, with possible loss of // pending events if not flushed correctly func SetStorage(storageEngine Storage) error { if storageEngine != nil { v, err := storageEngine.Version() if err != nil { return fmt.Errorf("could not determine storage version: %w", err) } if v < config.MinStorageVersion { return fmt.Errorf("could not configure storage of type %T (minimum storage version: %d, current storage version: %d)", storageEngine, config.MinStorageVersion, v) } } storage = storageEngine return nil } // GetStorage returns the primary storage for events. func GetStorage() (Storage, error) { if storage == nil { return nil, fmt.Errorf("no storage engine assigned") } return storage, nil } // SetAsyncStorage sets the desired storage engine for read-only events. Switching to a new // storage engine implies garbage collecting the old one, with possible loss of // pending events if not flushed correctly func SetAsyncStorage(storageEngine Storage) error { if storageEngine != nil { v, err := storageEngine.Version() if err != nil { return fmt.Errorf("could not determine storage version: %w", err) } if v < config.MinStorageVersion { return fmt.Errorf("could not configure storage of type %T (minimum storage version: %d, current storage version: %d)", storageEngine, config.MinStorageVersion, v) } } storageAsync = storageEngine return nil } func isStronglyConsistent(ctx xcontext.Context) bool { value := ctx.Value(consistencyModelKey) ctx.Debugf("consistency model check: %v", value) switch model := value.(type) { case ConsistencyModel: return model == ConsistentReadAfterWrite default: return true } } func WithConsistencyModel(ctx xcontext.Context, model ConsistencyModel) xcontext.Context { return xcontext.WithValue(ctx, consistencyModelKey, model) }