lib/storage/memory_storage.go (408 lines of code) (raw):

// Copyright 2019 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package storage import ( "context" "fmt" "io" "os" "path/filepath" "regexp" "time" "google.golang.org/grpc/codes" /* copybara-comment */ "google.golang.org/grpc/status" /* copybara-comment */ "github.com/golang/protobuf/jsonpb" /* copybara-comment */ "github.com/golang/protobuf/proto" /* copybara-comment */ ) const ( memStorageType = "memory" memStorageVersion = "v0" ) // MemoryStorage is designed as a single threading storage. Will throw exception if multiple TX request. type MemoryStorage struct { service string path string pathParts []string cache *StorageCache fs *FileStorage deleted map[string]bool wipedRealms map[string]bool lock chan bool lastLock time.Time } func NewMemoryStorage(service, path string) *MemoryStorage { return &MemoryStorage{ service: service, path: path, cache: NewStorageCache(), fs: NewFileStorage(service, path), deleted: make(map[string]bool), wipedRealms: make(map[string]bool), lock: make(chan bool, 1), lastLock: time.Unix(0, 0), } } func (m *MemoryStorage) Info() map[string]string { return map[string]string{ "type": memStorageType, "version": memStorageVersion, "service": m.service, "path": m.path, } } func (m *MemoryStorage) Exists(datatype, realm, user, id string, rev int64) (bool, error) { fname := m.fname(datatype, realm, user, id, rev) if _, ok := m.cache.GetEntity(fname); ok { return true, nil } if m.deleted[fname] || m.wipedRealms[realm] { return false, nil } return m.fs.Exists(datatype, realm, user, id, rev) } func (m *MemoryStorage) Read(datatype, realm, user, id string, rev int64, content proto.Message) error { return m.ReadTx(datatype, realm, user, id, rev, content, nil) } // ReadTx reads inside a transaction. func (m *MemoryStorage) ReadTx(datatype, realm, user, id string, rev int64, content proto.Message, tx Tx) (ferr error) { if tx == nil { var err error tx, err = m.Tx(false) if err != nil { return err } defer func() { err := tx.Finish() if ferr == nil { ferr = err } }() } fname := m.fname(datatype, realm, user, id, rev) if data, ok := m.cache.GetEntity(fname); ok { content.Reset() proto.Merge(content, data) return nil } if m.deleted[fname] || m.wipedRealms[realm] { return fmt.Errorf("not found: %q", fname) } if err := m.fs.ReadTx(datatype, realm, user, id, rev, content, tx); err != nil { return err } m.cache.PutEntity(fname, content) return nil } // MultiReadTx reads a set of objects matching the input parameters and filters func (m *MemoryStorage) MultiReadTx(datatype, realm, user, id string, filters [][]Filter, offset, pageSize int, typ proto.Message, tx Tx) (_ *Results, ferr error) { if tx == nil { var err error tx, err = m.fs.Tx(false) if err != nil { return nil, fmt.Errorf("file read lock error: %v", err) } defer func() { err := tx.Finish() if ferr == nil { ferr = err } }() } if pageSize > MaxPageSize { pageSize = MaxPageSize } results := NewResults() err := m.findPath(datatype, realm, user, id, typ, func(path, userMatch, idMatch string, p proto.Message) error { if m.deleted[m.fname(datatype, realm, userMatch, idMatch, LatestRev)] || m.wipedRealms[realm] { return nil } if id != MatchAllIDs && idMatch != id { return nil } if !MatchProtoFilters(filters, p) { return nil } if offset > 0 { offset-- return nil } if pageSize > results.MatchCount { results.Entries = append(results.Entries, &Entry{ Realm: realm, GroupID: userMatch, ItemID: idMatch, Item: p, }) } results.MatchCount++ return nil }) return results, err } func (m *MemoryStorage) findPath(datatype, realm, user, id string, typ proto.Message, fn func(string, string, string, proto.Message) error) error { searchUser := user if user == MatchAllUsers { searchUser = "(.*)" } else { searchUser = "(" + user + ")" } searchRealm := realm if realm == AllRealms { searchRealm = "(.*)" } searchID := id if id == MatchAllIDs { searchID = "(.*)" } else { searchID = "(" + id + ")" } extractID := m.fs.fname(datatype, searchRealm, searchUser, searchID, LatestRev) re, err := regexp.Compile(extractID) if err != nil { return fmt.Errorf("file extract ID %q regexp error: %v", extractID, err) } defaultUserID := m.fs.fname(datatype, realm, DefaultUser, searchID, LatestRev) dure, err := regexp.Compile(defaultUserID) if err != nil { return fmt.Errorf("file extract ID %q regexp error: %v", defaultUserID, err) } cached := m.cache.Entities() fileMatcher := func(path string, info os.FileInfo, err error) error { return extractFromPath(re, dure, user, path, info, err, typ, cached, fn) } if err = filepath.Walk(m.fs.path, fileMatcher); err != nil { return err } return extractFromCache(re, dure, user, cached, fn) } func extractFromPath(re, dure *regexp.Regexp, user, path string, info os.FileInfo, err error, typ proto.Message, cached map[string]proto.Message, fn func(string, string, string, proto.Message) error) error { if err != nil { return err } if info.IsDir() { return nil } if _, ok := cached[path]; ok { return nil } userMatch, idMatch := extractUserAndID(re, dure, user, path) if userMatch == "" && idMatch == "" { return nil } var p proto.Message if typ != nil { file, err := os.Open(path) if err != nil { return fmt.Errorf("file %q I/O error: %v", path, err) } defer file.Close() p = proto.Clone(typ) if err := jsonpb.Unmarshal(file, p); err != nil && err != io.EOF { return fmt.Errorf("file %q invalid JSON: %v", path, err) } } return fn(path, userMatch, idMatch, p) } func extractUserAndID(re, dure *regexp.Regexp, user, path string) (string, string) { matches := re.FindStringSubmatch(path) if len(matches) == 3 { return matches[1], matches[2] } if user == DefaultUser { matches = dure.FindStringSubmatch(path) if len(matches) == 2 { return DefaultUser, matches[1] } } return "", "" } func extractFromCache(re, dure *regexp.Regexp, user string, cached map[string]proto.Message, fn func(string, string, string, proto.Message) error) error { for path, content := range cached { if !re.MatchString(path) && !dure.MatchString(path) { continue } userMatch, idMatch := extractUserAndID(re, dure, user, path) if userMatch == "" && idMatch == "" { continue } if err := fn(path, userMatch, idMatch, content); err != nil { return err } } return nil } func (m *MemoryStorage) ReadHistory(datatype, realm, user, id string, content *[]proto.Message) error { return m.ReadHistoryTx(datatype, realm, user, id, content, nil) } // ReadHistoryTx reads history inside a transaction. func (m *MemoryStorage) ReadHistoryTx(datatype, realm, user, id string, content *[]proto.Message, tx Tx) (ferr error) { if tx == nil { var err error tx, err = m.Tx(false) if err != nil { return err } defer func() { err := tx.Finish() if ferr == nil { ferr = err } }() } hfname := m.historyName(datatype, realm, user, id) if data, ok := m.cache.GetHistory(hfname); ok { for _, he := range data { *content = append(*content, he) } return nil } if err := m.fs.ReadHistoryTx(datatype, realm, user, id, content, tx); err != nil { return err } m.cache.PutHistory(hfname, *content) return nil } func (m *MemoryStorage) Write(datatype, realm, user, id string, rev int64, content proto.Message, history proto.Message) error { return m.WriteTx(datatype, realm, user, id, rev, content, history, nil) } // WriteTx writes inside a transaction. func (m *MemoryStorage) WriteTx(datatype, realm, user, id string, rev int64, content proto.Message, history proto.Message, tx Tx) (ferr error) { if tx == nil { var err error tx, err = m.Tx(true) if err != nil { return err } defer func() { err := tx.Finish() if ferr == nil { ferr = err } }() } hlist := make([]proto.Message, 0) if err := m.ReadHistoryTx(datatype, realm, user, id, &hlist, tx); err != nil && !ErrNotFound(err) { return err } hlist = append(hlist, history) hfname := m.historyName(datatype, realm, user, id) m.cache.PutHistory(hfname, hlist) vname := m.fname(datatype, realm, user, id, rev) m.cache.PutEntity(vname, content) lname := m.fname(datatype, realm, user, id, LatestRev) m.cache.PutEntity(lname, content) if _, ok := m.deleted[vname]; ok { delete(m.deleted, vname) } if _, ok := m.deleted[lname]; ok { delete(m.deleted, lname) } return nil } // Delete a record. func (m *MemoryStorage) Delete(datatype, realm, user, id string, rev int64) error { return m.DeleteTx(datatype, realm, user, id, rev, nil) } // DeleteTx delete a record with transaction. func (m *MemoryStorage) DeleteTx(datatype, realm, user, id string, rev int64, tx Tx) (ferr error) { if tx == nil { var err error tx, err = m.Tx(true) if err != nil { return err } defer func() { err := tx.Finish() if ferr == nil { ferr = err } }() } exists, err := m.Exists(datatype, realm, user, id, rev) if err != nil { return err } lname := m.fname(datatype, realm, user, id, LatestRev) if !exists { return status.Errorf(codes.NotFound, "not found: %q", lname) } vname := m.fname(datatype, realm, user, id, rev) m.cache.DeleteEntity(vname) m.cache.DeleteEntity(lname) m.deleted[vname] = true m.deleted[lname] = true return nil } // MultiDeleteTx deletes all records of a certain data type within a realm. func (m *MemoryStorage) MultiDeleteTx(datatype, realm, user string, tx Tx) (ferr error) { if tx == nil { var err error tx, err = m.fs.Tx(false) if err != nil { return fmt.Errorf("file read lock error: %v", err) } defer func() { err := tx.Finish() if ferr == nil { ferr = err } }() } return m.findPath(datatype, realm, user, MatchAllIDs, nil, func(path, userMatch, idMatch string, p proto.Message) error { return m.DeleteTx(datatype, realm, userMatch, idMatch, LatestRev, tx) }) } func (m *MemoryStorage) Wipe(ctx context.Context, realm string, batchNum, maxEntries int) (int, error) { // Wipe everything, not just for the realm provided or the maxEntries. count := len(m.cache.entityCache) + len(m.cache.historyCache) m.cache = NewStorageCache() m.deleted = make(map[string]bool) m.wipedRealms[realm] = true return count, nil } func (m *MemoryStorage) Tx(update bool) (Tx, error) { select { case m.lock <- true: default: panic("MAYBE BUG: Requested a new TX without the existing TX release.") } m.cache.Backup() return &MemTx{ update: update, ms: m, }, nil } // LockTx returns a storage-wide lock by the given name. Only one such lock should // be requested at a time. If Tx is provided, it must be an update Tx. func (m *MemoryStorage) LockTx(lockName string, minFrequency time.Duration, tx Tx) Tx { now := time.Now() if now.Sub(m.lastLock) < minFrequency { return nil } if tx == nil { var err error tx, err = m.Tx(true) if err != nil { return nil } } m.lastLock = now return tx } type MemTx struct { update bool ms *MemoryStorage } // Finish attempts to commit a transaction. func (tx *MemTx) Finish() error { select { case <-tx.ms.lock: default: panic("MAYBE BUG: Releasing a released TX.") } return nil } // Rollback attempts to rollback a transaction. func (tx *MemTx) Rollback() error { tx.ms.cache.Restore() tx.ms.fs = NewFileStorage(tx.ms.service, tx.ms.path) return nil } // MakeUpdate will upgrade a read-only transaction to an update transaction. func (tx *MemTx) MakeUpdate() error { tx.update = true return nil } func (tx *MemTx) IsUpdate() bool { return tx.update } func (m *MemoryStorage) fname(datatype, realm, user, id string, rev int64) string { return m.fs.fname(datatype, realm, user, id, rev) } func (m *MemoryStorage) historyName(datatype, realm, user, id string) string { return m.fs.historyName(datatype, realm, user, id) }