lib/dsstore/datastore.go (613 lines of code) (raw):

// Copyright 2019 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package dsstore is a Datastore-based storage for DAM/IC. package dsstore import ( "context" "fmt" "math/rand" "strings" "time" glog "github.com/golang/glog" /* copybara-comment */ "cloud.google.com/go/datastore" /* copybara-comment */ "google.golang.org/api/iterator" /* copybara-comment: iterator */ "google.golang.org/grpc/codes" /* copybara-comment */ "google.golang.org/grpc/status" /* copybara-comment */ "github.com/golang/protobuf/jsonpb" /* copybara-comment */ "github.com/golang/protobuf/proto" /* copybara-comment */ "github.com/GoogleCloudPlatform/healthcare-federated-access-services/lib/storage" /* copybara-comment: storage */ cpb "github.com/GoogleCloudPlatform/healthcare-federated-access-services/proto/common/v1" /* copybara-comment: go_proto */ ) // storageVersion is the version of store data model. // If there is a breaking change to the store data model, the version needs to // be updated. const ( storageType = "gcpDatastore" storageVersion = "v0" metaVersion = "version" maxRowsPerBatchOperation = 50000 // never exceed this number of rows without a LRO ) // Data var ( entityKind = "entity" historyKind = "history" metaKind = "meta" ) // Key is the key for items. type Key struct { Datatype string `datastore:"type"` Realm string `datastore:"realm"` User string `datastore:"user_id"` ID string `datastore:"id"` Rev int64 `datastore:"rev"` } // Store is a datastore based implementation of storage. type Store struct { client *datastore.Client // TODO: these fileds are only used for Info and are not related to the store. // Move them to lib/serviceinfo. // project: the GCP project in which the datastore resides. project string // service: the name of the service (e.g. "dam" or "ic"). service string // path: the path to the config file. path string } // Entity is a datastore entity for data. type Entity struct { Key *datastore.Key `datastore:"__key__"` Service string `datastore:"service"` Datatype string `datastore:"type"` Realm string `datastore:"realm"` User string `datastore:"user_id"` ID string `datastore:"id"` Rev int64 `datastore:"rev"` Version string `datastore:"version,noindex"` Modified int64 `datastore:"modified"` Content string `datastore:"content,noindex"` } // History is an datastore entity for history. type History struct { Key *datastore.Key `datastore:"__key__"` Service string `datastore:"service"` Datatype string `datastore:"type"` Realm string `datastore:"realm"` User string `datastore:"user_id"` ID string `datastore:"id"` Rev int64 `datastore:"rev"` Version string `datastore:"version,noindex"` Modified int64 `datastore:"modified"` Content string `datastore:"content,noindex"` } // Meta is a datastore entity for meta. type Meta struct { Key *datastore.Key `datastore:"__key__"` Name string `datastore:"name"` Value string `datastore:"value,noindex"` } // NewStore creates a new datastore storace and initilizes it. // TODO: create the client for datastore in the main and inject it. func NewStore(ctx context.Context, project, service, path string) *Store { client, err := datastore.NewClient(ctx, project) if err != nil { glog.Fatalf("cannot initialize datastore: %v", err) } s := New(client, project, service, path) if err := s.Init(context.Background()); err != nil { glog.Fatalf("Datastore failed to initialize: %v", err) } return s } // New creates a new storage. func New(client *datastore.Client, project, service, path string) *Store { return &Store{ client: client, project: project, service: service, path: path, } } // Info returns some information about the store. // TODO: delete this and pass the information directly rather than through store. func (s *Store) Info() map[string]string { return map[string]string{ "type": storageType, "version": storageVersion, "service": s.service, "path": s.path, } } // Exists checks if a data entity with the given name exists. func (s *Store) Exists(datatype, realm, user, id string, rev int64) (bool, error) { ctx := context.Background() /* TODO: pass ctx from request */ k := datastore.NameKey(entityKind, s.newEntityKey(datatype, realm, user, id, rev), nil) err := s.client.Get(ctx, k, &Entity{}) if err == datastore.ErrNoSuchEntity { return false, nil } if err != nil { return false, err } return true, nil } // Read reads a data entity. func (s *Store) Read(datatype, realm, user, id string, rev int64, content proto.Message) error { return s.ReadTx(datatype, realm, user, id, rev, content, nil) } // ReadTx reads a data entity inside a transaction. // ReadTx will not see the writes inside the transaction. func (s *Store) ReadTx(datatype, realm, user, id string, rev int64, content proto.Message, tx storage.Tx) (ferr error) { if tx == nil { var err error tx, err = s.Tx(false) if err != nil { return err } defer func() { err := tx.Finish() if ferr == nil { ferr = err } }() } dstx, ok := tx.(*Tx) if !ok { return status.Errorf(codes.InvalidArgument, "invalid transaction") } k := datastore.NameKey(entityKind, s.newEntityKey(datatype, realm, user, id, rev), nil) e, err := s.newEntity(k, datatype, realm, user, id, rev, content) if err != nil { return err } if err = dstx.Tx.Get(k, e); err != nil { if err == datastore.ErrNoSuchEntity { return status.Errorf(codes.NotFound, "not found: %q", k) } return err } if err := jsonpb.Unmarshal(strings.NewReader(e.Content), content); err != nil { return err } return nil } // MultiReadTx reads a set of data entities matching the filters. // MultiReadTx will not see the writes inside the transaction. // If realm is "" reads all realms. // if user is "" reads all users. // Returns a results object and error. func (s *Store) MultiReadTx(datatype, realm, user, id string, filters [][]storage.Filter, offset, pageSize int, typ proto.Message, tx storage.Tx) (_ *storage.Results, ferr error) { ctx := context.Background() /* TODO: pass ctx from request */ if tx == nil { var err error tx, err = s.Tx(false) if err != nil { return nil, err } defer func() { err := tx.Finish() if ferr == nil { ferr = err } }() } if pageSize > storage.MaxPageSize { pageSize = storage.MaxPageSize } q := datastore.NewQuery(entityKind). Filter("service =", s.service). Filter("type =", datatype) if realm != storage.AllRealms { q = q.Filter("realm =", realm) } if user != storage.MatchAllUsers { q = q.Filter("user_id = ", user) } if id != storage.MatchAllIDs { q = q.Filter("id = ", id) } q = q.Filter("rev = ", storage.LatestRev).Order("id") if len(filters) == 0 { // No post-filtering, so limit the query directly as an optimization. // Still can't use q.Limit(pageSize) because we want the total number of matches. q = q.Offset(offset) offset = 0 } results := storage.NewResults() it := s.client.Run(ctx, q) for { var e Entity _, err := it.Next(&e) if err == iterator.Done { break } if err != nil { return nil, err } if len(e.Content) == 0 { continue } p := proto.Clone(typ) if err := jsonpb.Unmarshal(strings.NewReader(e.Content), p); err != nil { return nil, err } if !storage.MatchProtoFilters(filters, p) { continue } // Offset cannot use q.Offset(x) because it must match complex filters above. // For pagination, decrease any remaining offset before accepting this entry. if offset > 0 { offset-- continue } if pageSize == 0 || pageSize > results.MatchCount { results.Entries = append(results.Entries, &storage.Entry{ Realm: realm, GroupID: e.User, ItemID: e.ID, Item: p, }) } results.MatchCount++ } return results, nil } // ReadHistory reads the history. func (s *Store) ReadHistory(datatype, realm, user, id string, content *[]proto.Message) error { return s.ReadHistoryTx(datatype, realm, user, id, content, nil) } // ReadHistoryTx reads the history inside a transaction. func (s *Store) ReadHistoryTx(datatype, realm, user, id string, content *[]proto.Message, tx storage.Tx) (ferr error) { ctx := context.Background() /* TODO: pass ctx from request */ if tx == nil { var err error tx, err = s.Tx(false) if err != nil { return err } defer func() { err := tx.Finish() if ferr == nil { ferr = err } }() } // TODO: handle pagination. q := datastore.NewQuery(historyKind).Filter("service =", s.service). Filter("type =", datatype). Filter("realm =", realm). Filter("user_id =", user). Filter("id =", id). Order("rev"). Limit(storage.MaxPageSize) results := make([]History, storage.MaxPageSize) if _, err := s.client.GetAll(ctx, q, &results); err != nil { return err } for _, e := range results { if len(e.Content) == 0 { continue } he := &cpb.HistoryEntry{} if err := jsonpb.Unmarshal(strings.NewReader(e.Content), he); err != nil { return err } *content = append(*content, he) } return nil } // Write writes a data entity. func (s *Store) Write(datatype, realm, user, id string, rev int64, content proto.Message, history proto.Message) error { return s.WriteTx(datatype, realm, user, id, rev, content, history, nil) } // WriteTx writes a data entity inside a transaction. func (s *Store) WriteTx(datatype, realm, user, id string, rev int64, content proto.Message, history proto.Message, tx storage.Tx) (ferr error) { if tx == nil { var err error tx, err = s.Tx(true) if err != nil { return err } defer func() { err := tx.Finish() if ferr == nil { ferr = err } }() } dstx, ok := tx.(*Tx) if !ok { return status.Errorf(codes.InvalidArgument, "invalid transaction") } // TODO: ensure that the handling of last rev between write and delete are correct. if rev != storage.LatestRev { rk := datastore.NameKey(entityKind, s.newEntityKey(datatype, realm, user, id, rev), nil) re, err := s.newEntity(rk, datatype, realm, user, id, rev, content) if err != nil { return err } if _, err = dstx.Tx.Put(rk, re); err != nil { dstx.Rollback() return err } } if history != nil { hk := datastore.NameKey(historyKind, s.newHistoryKey(datatype, realm, user, id, rev), nil) he, err := s.newHistory(hk, datatype, realm, user, id, rev, history) if err != nil { dstx.Rollback() return err } if _, err = dstx.Tx.Put(hk, he); err != nil { dstx.Rollback() return err } } k := datastore.NameKey(entityKind, s.newEntityKey(datatype, realm, user, id, storage.LatestRev), nil) e, err := s.newEntity(k, datatype, realm, user, id, storage.LatestRev, content) if err != nil { dstx.Rollback() return err } if _, err := dstx.Tx.Put(k, e); err != nil { dstx.Rollback() return err } return nil } // Delete deletes a data entity. func (s *Store) Delete(datatype, realm, user, id string, rev int64) error { return s.DeleteTx(datatype, realm, user, id, rev, nil) } // DeleteTx deletes a data entity inside a transaction. func (s *Store) DeleteTx(datatype, realm, user, id string, rev int64, tx storage.Tx) (ferr error) { if tx == nil { var err error tx, err = s.Tx(true) if err != nil { return err } defer func() { err := tx.Finish() if ferr == nil { ferr = err } }() } dstx, ok := tx.(*Tx) if !ok { return status.Errorf(codes.InvalidArgument, "invalid transaction") } k := datastore.NameKey(entityKind, s.newEntityKey(datatype, realm, user, id, rev), nil) if err := dstx.Tx.Delete(k); err != nil { dstx.Rollback() if err == datastore.ErrNoSuchEntity { return status.Errorf(codes.NotFound, "not found: %q", k) } return err } return nil } // MultiDeleteTx deletes many records of a certain data type within a realm. // Is limited by maxRowsPerBatchOperation before needing to use an LRO instead. // If user is "", deletes for all users. func (s *Store) MultiDeleteTx(datatype, realm, user string, tx storage.Tx) error { q := datastore.NewQuery(entityKind). Filter("service =", s.service). Filter("type =", datatype). Filter("realm =", realm) if user != storage.DefaultUser { q = q.Filter("user_id =", user) } q = q.Filter("rev = ", storage.LatestRev). Order("id") _, err := s.multiDelete(context.Background() /* TODO: pass ctx from request */, q, maxRowsPerBatchOperation) return err } // Wipe deletes all data and history within a realm but no more than maxEntries. // If realm is "" deletes for all realms; use all realms mode with caution as it will remove the master realm's config too. // Returns count of deleted items and error. func (s *Store) Wipe(ctx context.Context, realm string, batchNum, maxEntries int) (int, error) { if batchNum == 0 { glog.Infof("Datastore wipe project %q service %q realm %q: started", s.project, s.service, realm) } results := make(map[string]int) if maxEntries <= 0 { maxEntries = maxRowsPerBatchOperation } deleted := 0 max := maxEntries for _, kind := range []string{historyKind, entityKind} { q := datastore.NewQuery(kind). Filter("service =", s.service) if realm != storage.AllRealms { q = q.Filter("realm =", realm) } if maxEntries > 0 { q.Limit(max) } total, err := s.multiDelete(ctx, q, max) if err != nil { return deleted, err } results[kind] = total max -= total deleted += total if max <= 0 { return deleted, nil } } glog.Infof("Datastore wipe project %q service %q realm %q: completed results: %#v", s.project, s.service, realm, results) return deleted, nil } // multiDelete all entities matching the provided query. // Returns the total number of items matching the query. func (s *Store) multiDelete(ctx context.Context, q *datastore.Query, maxEntries int) (int, error) { keys, err := s.client.GetAll(ctx, q.KeysOnly(), nil) if err != nil { return 0, err } if maxEntries <= 0 { maxEntries = maxRowsPerBatchOperation } max := maxEntries // Datastore API doesn't allow more than 500 per MultiDelete rpc. chunkSize := 400 total := len(keys) for i := 0; i < total; i += chunkSize { if chunkSize > max { chunkSize = max } end := i + chunkSize if total < end { end = total } chunk := keys[i:end] if err := s.client.DeleteMulti(ctx, chunk); err != nil { return total, err } max -= chunkSize if max <= 0 { return maxEntries, nil } } return total, nil } // Tx creates a new transaction for the store. func (s *Store) Tx(update bool) (storage.Tx, error) { var err error var dstx *datastore.Transaction if update { dstx, err = s.client.NewTransaction(context.Background() /* TODO: pass ctx from request */) } else { dstx, err = s.client.NewTransaction(context.Background() /* TODO: pass ctx from request */, datastore.ReadOnly) } if err != nil { return nil, err } return &Tx{ update: update, client: s.client, Tx: dstx, }, nil } const ( minJitter = 1 * 1e9 // nanoseconds as integer for math maxJitter = 3 * 1e9 // nanoseconds as integer for math ) // LockTx returns a storage-wide lock by the given name. Only one such lock should // be requested at a time. If Tx is provided, it must be an update Tx. // TODO: get rid of this function and fix the code using it. // Note: This doesn't provide distributed mutual exclusion, don't use this. func (s *Store) LockTx(lockName string, minFrequency time.Duration, tx storage.Tx) storage.Tx { if tx == nil { var err error tx, err = s.Tx(true) if err != nil { return nil } // Do not defer tx.Finish() as it must be not be freed unless the lock attempt fails. } else if !tx.IsUpdate() { return nil } entry := cpb.HistoryEntry{} locked := false for try := 0; try < 5; try++ { if err := s.ReadTx(storage.LockDatatype, storage.DefaultRealm, storage.DefaultUser, lockName, storage.LatestRev, &entry, tx); err == nil || storage.ErrNotFound(err) { // Will setup the object below. locked = true break } jitter := minJitter + rand.Float64()*(maxJitter-minJitter) time.Sleep(time.Duration(jitter)) } if !locked { tx.Finish() return nil } if diff := time.Now().Sub(time.Unix(int64(entry.CommitTime), 0)); diff < minFrequency { tx.Finish() return nil } entry.CommitTime = float64(time.Now().Unix()) if err := s.WriteTx(storage.LockDatatype, storage.DefaultRealm, storage.DefaultUser, lockName, storage.LatestRev, &entry, nil, tx); err != nil { tx.Finish() return nil } return tx } // Init initilizes the store. // It creates some metadata information about the store on datastore. // If metada information already exists on datastore, it comapres to see if they // are compatible with the metadata information of the current store. func (s *Store) Init(ctx context.Context) error { k := datastore.NameKey(metaKind, s.newMetaKey(metaVersion), nil) meta := &Meta{} if err := s.client.Get(context.Background() /* TODO: pass ctx from request */, k, meta); err == datastore.ErrNoSuchEntity { meta = &Meta{ Key: k, Name: metaVersion, Value: storageVersion, } _, err := s.client.Put(context.Background() /* TODO: pass ctx from request */, k, meta) if err != nil { return status.Errorf(codes.Internal, "cannot write datastore metadata: %v", err) } } else if err != nil { return status.Errorf(codes.Internal, "cannot access datastore metadata: %v", err) } glog.Infof("Datastore service %q version: %s", s.service, meta.Value) if meta.Value != storageVersion { return status.Errorf(codes.FailedPrecondition, "datastore version not compatible: expected %q, got %q", storageVersion, meta.Value) } return nil } // Data func (s *Store) newHistoryKey(datatype, realm, user, id string, rev int64) string { r := storage.LatestRevName if rev > 0 { r = fmt.Sprintf("%06d", rev) } if user == storage.DefaultUser { user = "~" } return fmt.Sprintf("%s/%s.%s/%s/%s/%s/%s", s.service, datatype, storage.HistoryRevName, realm, user, id, r) } func (s *Store) newMeta(key *datastore.Key) *Meta { return &Meta{ Key: key, Name: "version", Value: storageVersion, } } func (s *Store) newMetaKey(id string) string { return fmt.Sprintf("%s/%s/%s/%s", s.service, "meta", id, "meta") } func (s *Store) newEntityKey(datatype, realm, user, id string, rev int64) string { r := storage.LatestRevName if rev > 0 { r = fmt.Sprintf("%06d", rev) } if user == storage.DefaultUser { user = "~" } return fmt.Sprintf("%s/%s/%s/%s/%s/%s", s.service, datatype, realm, user, id, r) } func (s *Store) newEntity(key *datastore.Key, datatype, realm, user, id string, rev int64, content proto.Message) (*Entity, error) { js, err := (&jsonpb.Marshaler{}).MarshalToString(content) if err != nil { return nil, err } return &Entity{ Key: key, Service: s.service, Datatype: datatype, Realm: realm, User: user, ID: id, Rev: rev, Version: storageVersion, Modified: time.Now().Unix(), Content: js, }, nil } func (s *Store) newHistory(key *datastore.Key, datatype, realm, user, id string, rev int64, content proto.Message) (*History, error) { js, err := (&jsonpb.Marshaler{}).MarshalToString(content) if err != nil { return nil, err } return &History{ Key: key, Service: s.service, Datatype: datatype, Realm: realm, User: user, ID: id, Rev: rev, Version: storageVersion, Modified: time.Now().Unix(), Content: js, }, nil } // Transaction // Tx is a transaction. type Tx struct { update bool client *datastore.Client Tx *datastore.Transaction } // IsUpdate tells if the transaction is an update or read-only. func (tx *Tx) IsUpdate() bool { return tx.update } // Finish attempts to commit a transaction. func (tx *Tx) Finish() error { if tx.Tx == nil { return nil } _, err := tx.Tx.Commit() if err != nil { glog.Infof("datastore error committing transaction: %v", err) return err } tx.Tx = nil return nil } // Rollback attempts to rollback a transaction. func (tx *Tx) Rollback() error { if tx.Tx == nil { return nil } err := tx.Tx.Rollback() if err != nil { glog.Infof("datastore error during rollback of transaction: %v", err) return err } // Transaction cannot be used after a rollback. tx.Tx = nil return nil } // MakeUpdate will upgrade a read-only transaction to an update transaction. func (tx *Tx) MakeUpdate() error { if tx.IsUpdate() { return nil } if err := tx.Finish(); err != nil { return err } dstx, err := tx.client.NewTransaction(context.Background() /* TODO: pass ctx from request */) if err != nil { return err } tx.update = true tx.Tx = dstx return nil }