registry/storage/driver/filesystem/driver.go (365 lines of code) (raw):
package filesystem
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
"path"
"path/filepath"
"sort"
"time"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/base"
"github.com/docker/distribution/registry/storage/driver/factory"
)
const (
driverName = "filesystem"
defaultRootDirectory = "/var/lib/registry"
defaultMaxThreads = uint64(100)
// minThreads is the minimum value for the maxthreads configuration
// parameter. If the driver's parameters are less than this we set
// the parameters to minThreads
minThreads = uint64(25)
)
// DriverParameters represents all configuration options available for the
// filesystem driver
type DriverParameters struct {
RootDirectory string
MaxThreads uint64
}
func init() {
factory.Register(driverName, &filesystemDriverFactory{})
}
// filesystemDriverFactory implements the factory.StorageDriverFactory interface
type filesystemDriverFactory struct{}
func (*filesystemDriverFactory) Create(parameters map[string]any) (storagedriver.StorageDriver, error) {
return FromParameters(parameters)
}
type driver struct {
rootDirectory string
}
type baseEmbed struct {
base.Base
}
// Driver is a storagedriver.StorageDriver implementation backed by a local
// filesystem. All provided paths will be subpaths of the RootDirectory.
type Driver struct {
baseEmbed
}
// FromParameters constructs a new Driver with a given parameters map
// Optional Parameters:
// - rootdirectory
// - maxthreads
func FromParameters(parameters map[string]any) (*Driver, error) {
params, err := fromParametersImpl(parameters)
if err != nil || params == nil {
return nil, err
}
return New(*params), nil
}
func fromParametersImpl(parameters map[string]any) (*DriverParameters, error) {
var (
err error
maxThreads = defaultMaxThreads
rootDirectory = defaultRootDirectory
)
if parameters != nil {
if rootDir, ok := parameters["rootdirectory"]; ok {
rootDirectory = fmt.Sprint(rootDir)
}
maxThreads, err = base.GetLimitFromParameter(parameters["maxthreads"], minThreads, defaultMaxThreads)
if err != nil {
return nil, fmt.Errorf("maxthreads config error: %s", err.Error())
}
}
params := &DriverParameters{
RootDirectory: rootDirectory,
MaxThreads: maxThreads,
}
return params, nil
}
// New constructs a new Driver with a given rootDirectory
func New(params DriverParameters) *Driver {
fsDriver := &driver{rootDirectory: params.RootDirectory}
return &Driver{
baseEmbed: baseEmbed{
Base: base.Base{
StorageDriver: base.NewRegulator(fsDriver, params.MaxThreads),
},
},
}
}
// Implement the storagedriver.StorageDriver interface
func (*driver) Name() string {
return driverName
}
// GetContent retrieves the content stored at "targetPath" as a []byte.
func (d *driver) GetContent(ctx context.Context, targetPath string) ([]byte, error) {
rc, err := d.Reader(ctx, targetPath, 0)
if err != nil {
return nil, err
}
defer rc.Close()
data, err := io.ReadAll(rc)
if err != nil {
return nil, err
}
return data, nil
}
// PutContent stores the []byte content at a location designated by "path".
func (d *driver) PutContent(ctx context.Context, subPath string, contents []byte) error {
writer, err := d.Writer(ctx, subPath, false)
if err != nil {
return err
}
defer writer.Close()
_, err = io.Copy(writer, bytes.NewReader(contents))
if err != nil {
_ = writer.Cancel()
return err
}
return writer.Commit()
}
// Reader retrieves an io.ReadCloser for the content stored at "targetPath" with a
// given byte offset.
func (d *driver) Reader(_ context.Context, targetPath string, offset int64) (io.ReadCloser, error) {
// NOTE(prozlach): mode does not matter as we do not pass O_CREATE flag anyway
file, err := os.OpenFile(d.fullPath(targetPath), os.O_RDONLY, 0o600)
if err != nil {
if os.IsNotExist(err) {
return nil, storagedriver.PathNotFoundError{Path: targetPath, DriverName: driverName}
}
return nil, err
}
seekPos, err := file.Seek(offset, io.SeekStart)
if err != nil {
_ = file.Close()
return nil, err
} else if seekPos < offset {
_ = file.Close()
return nil, storagedriver.InvalidOffsetError{Path: targetPath, Offset: offset, DriverName: driverName}
}
return file, nil
}
func (d *driver) Writer(_ context.Context, subPath string, doAppend bool) (storagedriver.FileWriter, error) {
fullPath := d.fullPath(subPath)
parentDir := path.Dir(fullPath)
if doAppend {
_, err := os.Stat(fullPath)
if err != nil {
if os.IsNotExist(err) {
return nil, storagedriver.PathNotFoundError{Path: fullPath, DriverName: driverName}
}
return nil, fmt.Errorf("checking if file exist while creating append writer: %w", err)
}
}
// nolint:gosec // needs some more research so that we do not break anything
if err := os.MkdirAll(parentDir, 0o777); err != nil {
return nil, err
}
// nolint: gosec
fp, err := os.OpenFile(fullPath, os.O_WRONLY|os.O_CREATE, 0o666)
if err != nil {
return nil, err
}
var offset int64
if !doAppend {
err := fp.Truncate(0)
if err != nil {
_ = fp.Close()
return nil, err
}
} else {
n, err := fp.Seek(0, io.SeekEnd)
if err != nil {
_ = fp.Close()
return nil, err
}
offset = n
}
return newFileWriter(fp, offset), nil
}
// Stat retrieves the FileInfo for the given path, including the current size
// in bytes and the creation time.
func (d *driver) Stat(_ context.Context, subPath string) (storagedriver.FileInfo, error) {
fullPath := d.fullPath(subPath)
fi, err := os.Stat(fullPath)
if err != nil {
if os.IsNotExist(err) {
return nil, storagedriver.PathNotFoundError{Path: subPath, DriverName: driverName}
}
return nil, err
}
return fileInfo{
path: subPath,
FileInfo: fi,
}, nil
}
// List returns a list of the objects that are direct descendants of the given
// path.
func (d *driver) List(_ context.Context, subPath string) ([]string, error) {
fullPath := d.fullPath(subPath)
// nolint: gosec
dir, err := os.Open(fullPath)
if err != nil {
if os.IsNotExist(err) {
return nil, storagedriver.PathNotFoundError{Path: subPath, DriverName: driverName}
}
return nil, err
}
defer dir.Close()
fileNames, err := dir.Readdirnames(0)
if err != nil {
return nil, err
}
keys := make([]string, 0, len(fileNames))
for _, fileName := range fileNames {
keys = append(keys, path.Join(subPath, fileName))
}
// Ensure consistent sorting across platforms.
sort.Strings(keys)
return keys, nil
}
// Move moves an object stored at sourcePath to destPath, removing the original
// object.
func (d *driver) Move(_ context.Context, sourcePath, destPath string) error {
source := d.fullPath(sourcePath)
dest := d.fullPath(destPath)
if _, err := os.Stat(source); os.IsNotExist(err) {
return storagedriver.PathNotFoundError{Path: sourcePath, DriverName: driverName}
}
// nolint:gosec // needs some more research so that we do not break anything
if err := os.MkdirAll(path.Dir(dest), 0o755); err != nil {
return err
}
err := os.Rename(source, dest)
return err
}
// Delete recursively deletes all objects stored at "path" and its subpaths.
func (d *driver) Delete(_ context.Context, subPath string) error {
fullPath := d.fullPath(subPath)
_, err := os.Stat(fullPath)
if err != nil && !os.IsNotExist(err) {
return err
} else if err != nil {
return storagedriver.PathNotFoundError{Path: subPath, DriverName: driverName}
}
err = os.RemoveAll(fullPath)
return err
}
// DeleteFiles deletes a set of files by iterating over their full path list and invoking Delete for each. The parent
// directory of each file is automatically removed if empty. Returns the number of successfully deleted files (parent
// directories not included) and any errors. This method is idempotent, no error is returned if a file does not exist.
func (d *driver) DeleteFiles(ctx context.Context, paths []string) (int, error) {
count := 0
for _, p := range paths {
if err := d.Delete(ctx, p); err != nil {
if !errors.As(err, new(storagedriver.PathNotFoundError)) {
return count, err
}
}
count++
// delete parent directory as well if empty
p := d.fullPath(filepath.Dir(p))
// nolint:gosec
f, err := os.Open(p)
if err != nil {
// ignore if not found
if _, ok := err.(*os.PathError); !ok {
return count, err
}
continue
}
// Attempt to read info about 1 file within this directory,
// if err is of type io.EOF than the directory is empty.
if _, err = f.Readdir(1); err == io.EOF {
if err := os.Remove(p); err != nil {
_ = f.Close()
return count, err
}
}
_ = f.Close()
}
return count, nil
}
// URLFor returns a URL which may be used to retrieve the content stored at the given path.
// May return an UnsupportedMethodErr in certain StorageDriver implementations.
func (*driver) URLFor(_ context.Context, _ string, _ map[string]any) (string, error) {
return "", storagedriver.ErrUnsupportedMethod{DriverName: driverName}
}
// Walk traverses a filesystem defined within driver, starting from the given
// path, calling f on each file
func (d *driver) Walk(ctx context.Context, targetPath string, f storagedriver.WalkFn) error {
return storagedriver.WalkFallback(ctx, d, targetPath, f)
}
// WalkParallel traverses a filesystem defined within driver in parallel, starting
// from the given path, calling f on each file.
func (d *driver) WalkParallel(ctx context.Context, targetPath string, f storagedriver.WalkFn) error {
// NOTE(prozlach): WalkParallel will go away at some point, see
// https://gitlab.com/gitlab-org/container-registry/-/issues/1182#note_2258251909
// for more context.
return d.Walk(ctx, targetPath, f)
}
// fullPath returns the absolute path of a key within the Driver's storage.
func (d *driver) fullPath(subPath string) string {
return path.Join(d.rootDirectory, subPath)
}
type fileInfo struct {
os.FileInfo
path string
}
var _ storagedriver.FileInfo = fileInfo{}
// Path provides the full path of the target of this file info.
func (fi fileInfo) Path() string {
return fi.path
}
// Size returns current length in bytes of the file. The return value can
// be used to write to the end of the file at path. The value is
// meaningless if IsDir returns true.
func (fi fileInfo) Size() int64 {
if fi.IsDir() {
return 0
}
return fi.FileInfo.Size()
}
// ModTime returns the modification time for the file. For backends that
// don't have a modification time, the creation time should be returned.
func (fi fileInfo) ModTime() time.Time {
return fi.FileInfo.ModTime()
}
// IsDir returns true if the path is a directory.
func (fi fileInfo) IsDir() bool {
return fi.FileInfo.IsDir()
}
type fileWriter struct {
file *os.File
size int64
bw *bufio.Writer
closed bool
committed bool
canceled bool
}
func newFileWriter(file *os.File, size int64) *fileWriter {
return &fileWriter{
file: file,
size: size,
bw: bufio.NewWriter(file),
}
}
func (fw *fileWriter) Write(p []byte) (int, error) {
switch {
case fw.closed:
return 0, storagedriver.ErrAlreadyClosed
case fw.committed:
return 0, storagedriver.ErrAlreadyCommited
case fw.canceled:
return 0, storagedriver.ErrAlreadyCanceled
}
n, err := fw.bw.Write(p)
fw.size += int64(n) // nolint: gosec // Write will always return non-negative number of bytes written
return n, err
}
func (fw *fileWriter) Size() int64 {
return fw.size
}
func (fw *fileWriter) Close() error {
if fw.closed {
return storagedriver.ErrAlreadyClosed
}
if fw.canceled {
// NOTE(prozlach): If the writer has already been canceled, then there
// is nothing to flush to the backend as the target file has already
// been deleted.
return nil
}
if err := fw.bw.Flush(); err != nil {
return fmt.Errorf("flushing file while closing writer: %w", err)
}
if err := fw.file.Sync(); err != nil {
return fmt.Errorf("syncing file while closing writer: %w", err)
}
if err := fw.file.Close(); err != nil {
return fmt.Errorf("closing file while closing writer: %w", err)
}
fw.closed = true
return nil
}
func (fw *fileWriter) Cancel() error {
if fw.closed {
return storagedriver.ErrAlreadyClosed
} else if fw.committed {
return storagedriver.ErrAlreadyCommited
}
fw.canceled = true
_ = fw.file.Close()
err := os.Remove(fw.file.Name())
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return nil
}
return fmt.Errorf("removing file %q: %w", fw.file.Name(), err)
}
return nil
}
func (fw *fileWriter) Commit() error {
switch {
case fw.closed:
return storagedriver.ErrAlreadyClosed
case fw.committed:
return storagedriver.ErrAlreadyCommited
case fw.canceled:
return storagedriver.ErrAlreadyCanceled
}
if err := fw.bw.Flush(); err != nil {
return err
}
if err := fw.file.Sync(); err != nil {
return err
}
fw.committed = true
return nil
}