registry/storage/purgeuploads.go (129 lines of code) (raw):
package storage
import (
"context"
"fmt"
"path"
"strings"
"sync"
"time"
"github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/uuid"
"github.com/sirupsen/logrus"
)
// uploadData stored the location of temporary files created during a layer upload
// along with the date the upload was started
type uploadData struct {
containingDir string
startedAt time.Time
}
func newUploadData() uploadData {
return uploadData{
containingDir: "",
// default to far in future to protect against missing startedat
startedAt: time.Now().Add(10000 * time.Hour),
}
}
// syncUploadData provides thread-safe operations on a map of uploadData.
type syncUploadData struct {
sync.Mutex
members map[string]uploadData
}
// set the passed uuid's uploadData to data
func (s *syncUploadData) set(u string, data uploadData) {
s.Lock()
defer s.Unlock()
s.members[u] = data
}
// get uploadData by uuid
func (s *syncUploadData) get(u string) (uploadData, bool) {
s.Lock()
defer s.Unlock()
up, ok := s.members[u]
return up, ok
}
// PurgeUploads deletes files from the upload directory
// created before olderThan. The list of files deleted and errors
// encountered are returned
func PurgeUploads(ctx context.Context, sdriver driver.StorageDriver, olderThan time.Time, actuallyDelete bool) ([]string, []error) {
logrus.Infof("PurgeUploads starting: olderThan=%s, actuallyDelete=%t", olderThan, actuallyDelete)
outstandingUploads, errors := getOutstandingUploads(ctx, sdriver)
var deleted []string
for _, outstandingUpload := range outstandingUploads {
if outstandingUpload.startedAt.Before(olderThan) {
var err error
logrus.Infof("Upload files in %s have older date (%s) than purge date (%s). Removing upload directory.",
outstandingUpload.containingDir, outstandingUpload.startedAt, olderThan)
if actuallyDelete {
err = sdriver.Delete(ctx, outstandingUpload.containingDir)
}
if err == nil {
deleted = append(deleted, outstandingUpload.containingDir)
} else {
errors = append(errors, err)
}
}
}
logrus.Infof("Purge uploads finished. Num deleted=%d, num errors=%d", len(deleted), len(errors))
return deleted, errors
}
// getOutstandingUploads walks the upload directory, collecting files
// which could be eligible for deletion. The only reliable way to
// classify the age of a file is with the date stored in the startedAt
// file, so gather files by UUID with a date from startedAt.
func getOutstandingUploads(ctx context.Context, sdriver driver.StorageDriver) (map[string]uploadData, []error) {
var errors []error
uploads := syncUploadData{sync.Mutex{}, make(map[string]uploadData, 0)}
root, err := pathFor(repositoriesRootPathSpec{})
if err != nil {
return uploads.members, append(errors, err)
}
errCh := make(chan error)
errDone := make(chan struct{})
go func() {
for err := range errCh {
errors = append(errors, err)
}
errDone <- struct{}{}
}()
err = sdriver.WalkParallel(ctx, root, func(fileInfo driver.FileInfo) error {
filePath := fileInfo.Path()
_, file := path.Split(filePath)
if (strings.HasPrefix(file, "_") || strings.HasPrefix(file, "hashstates")) && fileInfo.IsDir() && file != "_uploads" {
// Reserved directory
return driver.ErrSkipDir
}
uuid, isContainingDir := uuidFromPath(filePath)
if uuid == "" {
// Cannot reliably delete
return nil
}
ud, ok := uploads.get(uuid)
if !ok {
ud = newUploadData()
}
if isContainingDir {
ud.containingDir = filePath
}
if file == "startedat" {
t, err := readStartedAtFile(sdriver, filePath)
if err != nil {
errCh <- fmt.Errorf("%s: %s", filePath, err)
return nil
}
ud.startedAt = t
}
uploads.set(uuid, ud)
return nil
})
if err != nil {
errCh <- fmt.Errorf("%s: %s", root, err)
}
close(errCh)
<-errDone
return uploads.members, errors
}
// uuidFromPath extracts the upload UUID from a given path
// If the UUID is the last path component, this is the containing
// directory for all upload files
func uuidFromPath(p string) (string, bool) {
components := strings.Split(p, "/")
for i := len(components) - 1; i >= 0; i-- {
if u, err := uuid.Parse(components[i]); err == nil {
return u.String(), i == len(components)-1
}
}
return "", false
}
// readStartedAtFile reads the date from an upload's startedAtFile
func readStartedAtFile(sdriver driver.StorageDriver, targetPath string) (time.Time, error) {
// todo:(richardscothern) - pass in a context
startedAtBytes, err := sdriver.GetContent(context.Background(), targetPath)
if err != nil {
return time.Now(), err
}
startedAt, err := time.Parse(time.RFC3339, string(startedAtBytes))
if err != nil {
return time.Now(), err
}
return startedAt, nil
}