lib/store/ca_store.go (137 lines of code) (raw):
// Copyright (c) 2016-2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package store
import (
"fmt"
"hash"
"io"
"os"
"path"
"github.com/andres-erbsen/clock"
"github.com/docker/distribution/uuid"
"github.com/spaolacci/murmur3"
"github.com/uber-go/tally"
"github.com/uber/kraken/core"
"github.com/uber/kraken/lib/hrw"
"github.com/uber/kraken/lib/store/base"
)
// CAStore allows uploading / caching content-addressable files.
type CAStore struct {
config CAStoreConfig
*uploadStore
*cacheStore
cleanup *cleanupManager
}
// NewCAStore creates a new CAStore.
func NewCAStore(config CAStoreConfig, stats tally.Scope) (*CAStore, error) {
config = config.applyDefaults()
stats = stats.Tagged(map[string]string{
"module": "castore",
})
uploadStore, err := newUploadStore(config.UploadDir, config.ReadPartSize, config.WritePartSize)
if err != nil {
return nil, fmt.Errorf("new upload store: %s", err)
}
cacheBackend := base.NewCASFileStoreWithLRUMap(config.Capacity, clock.New())
cacheStore, err := newCacheStore(config.CacheDir, cacheBackend, config.ReadPartSize)
if err != nil {
return nil, fmt.Errorf("new cache store: %s", err)
}
if err := initCASVolumes(config.CacheDir, config.Volumes); err != nil {
return nil, fmt.Errorf("init cas volumes: %s", err)
}
cleanup, err := newCleanupManager(clock.New(), stats)
if err != nil {
return nil, fmt.Errorf("new cleanup manager: %s", err)
}
cleanup.addJob("upload", config.UploadCleanup, uploadStore.newFileOp())
cleanup.addJob("cache", config.CacheCleanup, cacheStore.newFileOp())
return &CAStore{config, uploadStore, cacheStore, cleanup}, nil
}
// Close terminates any goroutines started by s.
func (s *CAStore) Close() {
s.cleanup.stop()
}
// MoveUploadFileToCache commits uploadName as cacheName. Clients are expected
// to validate the content of the upload file matches the cacheName digest.
func (s *CAStore) MoveUploadFileToCache(uploadName, cacheName string) error {
uploadPath, err := s.uploadStore.newFileOp().GetFilePath(uploadName)
if err != nil {
return err
}
defer s.DeleteUploadFile(uploadName)
f, err := s.uploadStore.newFileOp().GetFileReader(uploadName, s.uploadStore.readPartSize)
if err != nil {
return fmt.Errorf("get file reader %s: %s", uploadName, err)
}
defer f.Close()
if err := s.verify(f, cacheName); err != nil {
return fmt.Errorf("verify digest: %s", err)
}
return s.cacheStore.newFileOp().MoveFileFrom(cacheName, s.cacheStore.state, uploadPath)
}
// CreateCacheFile initializes a cache file for name from r. name should be a raw
// hex sha256 digest, and the contents of r must hash to name.
func (s *CAStore) CreateCacheFile(name string, r io.Reader) error {
return s.WriteCacheFile(name, func(w FileReadWriter) error {
_, err := io.Copy(w, r)
return err
})
}
// WriteCacheFile initializes a cache file for name by passing a temporary
// upload file writer to the write function.
func (s *CAStore) WriteCacheFile(name string, write func(w FileReadWriter) error) error {
tmp := fmt.Sprintf("%s.%s", name, uuid.Generate().String())
if err := s.CreateUploadFile(tmp, 0); err != nil {
return fmt.Errorf("create upload file: %s", err)
}
defer s.DeleteUploadFile(tmp)
w, err := s.GetUploadFileReadWriter(tmp)
if err != nil {
return fmt.Errorf("get upload writer: %s", err)
}
defer w.Close()
if err := write(w); err != nil {
return err
}
if err := s.MoveUploadFileToCache(tmp, name); err != nil && !os.IsExist(err) {
return fmt.Errorf("move upload file to cache: %s", err)
}
return nil
}
// verify verifies that name is a valid SHA256 digest, and checks if the given
// blob content matches the digset unless explicitly skipped.
func (s *CAStore) verify(r io.Reader, name string) error {
// Verify that expected name is a valid SHA256 digest.
expected, err := core.NewSHA256DigestFromHex(name)
if err != nil {
return fmt.Errorf("new digest from file name: %s", err)
}
if !s.config.SkipHashVerification {
digester := core.NewDigester()
computed, err := digester.FromReader(r)
if err != nil {
return fmt.Errorf("calculate digest: %s", err)
}
if computed != expected {
return fmt.Errorf("computed digest %s doesn't match expected value %s", computed, expected)
}
}
return nil
}
func initCASVolumes(dir string, volumes []Volume) error {
if len(volumes) == 0 {
return nil
}
rendezvousHash := hrw.NewRendezvousHash(
func() hash.Hash { return murmur3.New64() },
hrw.UInt64ToFloat64)
for _, v := range volumes {
if _, err := os.Stat(v.Location); err != nil {
return fmt.Errorf("verify volume: %s", err)
}
rendezvousHash.AddNode(v.Location, v.Weight)
}
// Create 256 symlinks under dir.
for subdirIndex := 0; subdirIndex < 256; subdirIndex++ {
subdirName := fmt.Sprintf("%02X", subdirIndex)
nodes := rendezvousHash.GetOrderedNodes(subdirName, 1)
if len(nodes) != 1 {
return fmt.Errorf("calculate volume for subdir: %s", subdirName)
}
sourcePath := path.Join(nodes[0].Label, path.Base(dir), subdirName)
if err := os.MkdirAll(sourcePath, 0775); err != nil {
return fmt.Errorf("volume source path: %s", err)
}
targetPath := path.Join(dir, subdirName)
if err := createOrUpdateSymlink(sourcePath, targetPath); err != nil {
return fmt.Errorf("symlink to volume: %s", err)
}
}
return nil
}