lib/storage/file_storage.go (195 lines of code) (raw):
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"sync"
"time"
glog "github.com/golang/glog" /* copybara-comment */
"github.com/golang/protobuf/jsonpb" /* copybara-comment */
"github.com/golang/protobuf/proto" /* copybara-comment */
"github.com/GoogleCloudPlatform/healthcare-federated-access-services/lib/srcutil" /* copybara-comment: srcutil */
cpb "github.com/GoogleCloudPlatform/healthcare-federated-access-services/proto/common/v1" /* copybara-comment: go_proto */
)
const (
storageType = "file"
storageVersion = "v0"
)
type FileStorage struct {
service string
path string
mu sync.Mutex
cache *StorageCache
}
func NewFileStorage(service, path string) *FileStorage {
// Add the service name directory to the path:
// 1. Add the full service name if the subdirectory exists; or
// 2. The base service name (i.e. before the first "-" character).
servicePath := srcutil.Path(filepath.Join(path, service))
if err := checkFile(servicePath); err == nil {
path = servicePath
} else {
path = srcutil.Path(filepath.Join(path, strings.Split(service, "-")[0]))
}
glog.Infof("file storage for service %q using path %q.", service, path)
f := &FileStorage{
service: strings.Split(service, "-")[0],
path: path,
cache: NewStorageCache(),
}
return f
}
func (f *FileStorage) Info() map[string]string {
return map[string]string{
"type": storageType,
"version": storageVersion,
"service": f.service,
"path": f.path,
}
}
func (f *FileStorage) Exists(datatype, realm, user, id string, rev int64) (bool, error) {
fn := f.fname(datatype, realm, user, id, rev)
if _, ok := f.cache.GetEntity(fn); ok {
return true, nil
}
err := checkFile(fn)
if err == nil {
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return false, err
}
func (f *FileStorage) Read(datatype, realm, user, id string, rev int64, content proto.Message) error {
return f.ReadTx(datatype, realm, user, id, rev, content, nil)
}
// ReadTx reads inside a transaction.
func (f *FileStorage) ReadTx(datatype, realm, user, id string, rev int64, content proto.Message, tx Tx) (ferr error) {
f.mu.Lock()
defer f.mu.Unlock()
fname := f.fname(datatype, realm, user, id, rev)
if tx == nil || !tx.IsUpdate() {
if data, ok := f.cache.GetEntity(fname); ok {
content.Reset()
proto.Merge(content, data)
return nil
}
}
if tx == nil {
var err error
tx, err = f.Tx(false)
if err != nil {
return fmt.Errorf("file read lock error: %v", err)
}
defer func() {
err := tx.Finish()
if ferr == nil {
ferr = err
}
}()
}
if err := checkFile(fname); err != nil {
return err
}
file, err := os.Open(fname)
if err != nil {
return fmt.Errorf("file %q I/O error: %v", fname, err)
}
defer file.Close()
if err := jsonpb.Unmarshal(file, content); err != nil && err != io.EOF {
dat, _ := ioutil.ReadFile(fname)
return fmt.Errorf("file %q invalid JSON: %v\n%s", fname, err, string(dat))
}
if rev == LatestRev {
f.cache.PutEntity(fname, content)
}
return nil
}
// MultiReadTx reads a set of objects matching the input parameters and filters
func (f *FileStorage) MultiReadTx(datatype, realm, user, id string, filters [][]Filter, offset, pageSize int, typ proto.Message, tx Tx) (*Results, error) {
return nil, fmt.Errorf("file storage does not support MultiReadTx")
}
func (f *FileStorage) ReadHistory(datatype, realm, user, id string, content *[]proto.Message) error {
return f.ReadHistoryTx(datatype, realm, user, id, content, nil)
}
// ReadHistoryTx reads hisotry inside a transaction.
func (f *FileStorage) ReadHistoryTx(datatype, realm, user, id string, content *[]proto.Message, tx Tx) (ferr error) {
if tx == nil {
var err error
tx, err = f.Tx(false)
if err != nil {
return fmt.Errorf("history file read lock error: %v", err)
}
defer func() {
err := tx.Finish()
if ferr == nil {
ferr = err
}
}()
}
hfname := f.historyName(datatype, realm, user, id)
if err := checkFile(hfname); err != nil {
return err
}
b, err := ioutil.ReadFile(hfname)
if err != nil {
return fmt.Errorf("history file %q I/O error: %v", hfname, err)
}
full := `{"history":[` + string(b[:]) + "]}"
his := &cpb.History{}
if err := jsonpb.Unmarshal(strings.NewReader(full), his); err != nil {
return fmt.Errorf("history file %q invalid JSON: %v", hfname, err)
}
for _, he := range his.History {
*content = append(*content, proto.Message(he))
}
f.cache.PutHistory(hfname, *content)
return nil
}
func (f *FileStorage) Write(datatype, realm, user, id string, rev int64, content proto.Message, history proto.Message) error {
return fmt.Errorf("file storage does not support Write")
}
func (f *FileStorage) WriteTx(datatype, realm, user, id string, rev int64, content proto.Message, history proto.Message, tx Tx) error {
return fmt.Errorf("file storage does not support WriteTx")
}
// Delete a record.
func (f *FileStorage) Delete(datatype, realm, user, id string, rev int64) error {
return fmt.Errorf("file storage does not support Delete")
}
// DeleteTx delete a record with transaction.
func (f *FileStorage) DeleteTx(datatype, realm, user, id string, rev int64, tx Tx) error {
return fmt.Errorf("file storage does not support DeleteTx")
}
// MultiDeleteTx deletes all records of a certain data type within a realm.
func (f *FileStorage) MultiDeleteTx(datatype, realm, user string, tx Tx) error {
return fmt.Errorf("file storage does not support MultiDeleteTx")
}
// Wipe deletes all records within a realm.
func (f *FileStorage) Wipe(ctx context.Context, realm string, batchNum, maxEntries int) (int, error) {
return 0, fmt.Errorf("file storage does not support Wipe")
}
func (f *FileStorage) Tx(update bool) (Tx, error) {
return &FileTx{
writer: update,
}, nil
}
// LockTx returns a storage-wide lock by the given name. Only one such lock should
// be requested at a time. If Tx is provided, it must be an update Tx.
func (f *FileStorage) LockTx(lockName string, minFrequency time.Duration, tx Tx) Tx {
// Filestore does not support writing transactions, and returning nil indicates that
// the lock is not acquired.
return nil
}
type FileTx struct {
writer bool
}
// Finish attempts to commit a transaction.
func (tx *FileTx) Finish() error {
return nil
}
// Rollback attempts to rollback a transaction.
func (tx *FileTx) Rollback() error {
return nil
}
// MakeUpdate will upgrade a read-only transaction to an update transaction.
func (tx *FileTx) MakeUpdate() error {
tx.writer = true
return nil
}
func (tx *FileTx) IsUpdate() bool {
return tx.writer
}
func UserFragment(user string) string {
if user == DefaultUser {
return ""
}
return "_" + user
}