runner/localstore/store.go (81 lines of code) (raw):
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
package localstore
import (
"fmt"
"io"
"os"
"path/filepath"
"sync"
)
// Store is a filesystem-like key/value storage.
//
// Each key/value has committed and ingesting status. When OpenWriter returns
// ingestion transcation, the Store opens rootDir/ingest/$random file to
// receive value data. Once all the data is written, the Commit(ref) moves the
// file into rootDir/data/ref.
type Store struct {
sync.Mutex
dataDir string
ingestDir string
}
// NewStore returns new instance of Store.
func NewStore(rootDir string) (*Store, error) {
if !filepath.IsAbs(rootDir) {
return nil, fmt.Errorf("%s is not absolute path", rootDir)
}
dataDir := filepath.Join(rootDir, "data")
if err := os.MkdirAll(dataDir, 0600); err != nil {
return nil, fmt.Errorf("failed to ensure data dir %s: %w", dataDir, err)
}
ingestDir := filepath.Join(rootDir, "ingest")
if err := os.MkdirAll(ingestDir, 0600); err != nil {
return nil, fmt.Errorf("failed to ensure ingest dir %s: %w", ingestDir, err)
}
return &Store{
dataDir: dataDir,
ingestDir: ingestDir,
}, nil
}
// OpenWriter is to initiate a writing operation, ingestion transcation. A
// single ingestion transcation is to open temporary file and allow caller to
// write data into the temporary file. Once all the data is written, the caller
// should call Commit to complete ingestion transcation.
func (s *Store) OpenWriter() (Writer, error) {
f, err := os.CreateTemp(s.ingestDir, "ingest-*")
if err != nil {
return nil, fmt.Errorf("failed to create ingest file: %w", err)
}
return &writer{
s: s,
name: f.Name(),
f: f,
}, nil
}
// OpenReader is to open committed content named by ref.
func (s *Store) OpenReader(ref string) (Reader, error) {
s.Lock()
defer s.Unlock()
target := filepath.Join(s.dataDir, ref)
stat, err := os.Stat(target)
if err != nil {
return nil, fmt.Errorf("failed to ensure if ref %s exists: %w", ref, err)
}
size := stat.Size()
f, err := os.Open(target)
if err != nil {
return nil, fmt.Errorf("failed to open ref %s: %w", ref, err)
}
return &sizeReadCloser{
File: f,
size: size,
}, nil
}
// Delete is to delete committed content named by ref.
func (s *Store) Delete(ref string) error {
s.Lock()
defer s.Unlock()
target := filepath.Join(s.dataDir, ref)
_, err := os.Stat(target)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return fmt.Errorf("failed to ensure if ref %s exists: %w", ref, err)
}
return os.Remove(target)
}
// Writer handles writing of content into local store
type Writer interface {
// Close closes the writer.
//
// If the writer has not been committed, this allows aborting.
// Calling Close on a closed writer will not error.
io.WriteCloser
// Commit commits data as file named by ref.
//
// Commit always close Writer. If ref already exists, it will return
// error.
Commit(ref string) error
}
// Reader extends io.ReadCloser interface with io.ReaderAt and reporting of Size.
type Reader interface {
io.ReaderAt
io.ReadCloser
Size() int64
}