lib/store/simple_store.go (66 lines of code) (raw):

// Copyright (c) 2016-2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package store import ( "fmt" "io" "os" "github.com/andres-erbsen/clock" "github.com/docker/distribution/uuid" "github.com/uber-go/tally" "github.com/uber/kraken/lib/store/base" ) // SimpleStore allows uploading / caching raw files of any format. type SimpleStore struct { *uploadStore *cacheStore cleanup *cleanupManager } // NewSimpleStore creates a new SimpleStore. func NewSimpleStore(config SimpleStoreConfig, stats tally.Scope) (*SimpleStore, error) { stats = stats.Tagged(map[string]string{ "module": "simplestore", }) uploadStore, err := newUploadStore(config.UploadDir, config.ReadPartSize, config.WritePartSize) if err != nil { return nil, fmt.Errorf("new upload store: %s", err) } cacheBackend := base.NewLocalFileStore(clock.New()) cacheStore, err := newCacheStore(config.CacheDir, cacheBackend, config.ReadPartSize) if err != nil { return nil, fmt.Errorf("new cache store: %s", err) } cleanup, err := newCleanupManager(clock.New(), stats) if err != nil { return nil, fmt.Errorf("new cleanup manager: %s", err) } cleanup.addJob("upload", config.UploadCleanup, uploadStore.newFileOp()) cleanup.addJob("cache", config.CacheCleanup, cacheStore.newFileOp()) return &SimpleStore{uploadStore, cacheStore, cleanup}, nil } // Close terminates goroutines started by s. func (s *SimpleStore) Close() { s.cleanup.stop() } // MoveUploadFileToCache commits uploadName as cacheName. func (s *SimpleStore) MoveUploadFileToCache(uploadName, cacheName string) error { uploadPath, err := s.uploadStore.newFileOp().GetFilePath(uploadName) if err != nil { return err } defer s.DeleteUploadFile(uploadName) return s.cacheStore.newFileOp().MoveFileFrom(cacheName, s.cacheStore.state, uploadPath) } // CreateCacheFile initializes a cache file for name from r. func (s *SimpleStore) CreateCacheFile(name string, r io.Reader) error { tmp := fmt.Sprintf("%s.%s", name, uuid.Generate().String()) if err := s.CreateUploadFile(tmp, 0); err != nil { return fmt.Errorf("create upload file: %s", err) } defer s.DeleteUploadFile(tmp) w, err := s.GetUploadFileReadWriter(tmp) if err != nil { return fmt.Errorf("get upload writer: %s", err) } defer w.Close() if _, err := io.Copy(w, r); err != nil { return fmt.Errorf("copy: %s", err) } if err := s.MoveUploadFileToCache(tmp, name); err != nil && !os.IsExist(err) { return fmt.Errorf("move upload file to cache: %s", err) } return nil }