experimental/s3_storage.go (240 lines of code) (raw):
package experimental
import (
"flag"
"io/ioutil"
"os"
"path/filepath"
"strings"
"sync/atomic"
"github.com/Sirupsen/logrus"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
)
const listMax = 1000
type s3Storage struct {
*distributionStorageS3
S3 *s3.S3
apiCalls int64
expensiveApiCalls int64
freeApiCalls int64
cacheHits int64
cacheError int64
cacheMiss int64
}
var s3CacheStorage = flag.String("s3-storage-cache", "tmp-cache", "s3 cache")
func (f *s3Storage) fullPath(path string) string {
return filepath.Join(f.RootDirectory, "docker", "registry", "v2", path)
}
func (f *s3Storage) backupPath(path string) string {
return filepath.Join(f.RootDirectory, "docker-backup", "registry", "v2", path)
}
func (f *s3Storage) Walk(path string, baseDir string, fn walkFunc) error {
path = f.fullPath(path)
if path != "/" && path[len(path)-1] != '/' {
path = path + "/"
}
baseDir = f.fullPath(baseDir)
if baseDir != "/" && baseDir[len(baseDir)-1] != '/' {
baseDir = baseDir + "/"
}
atomic.AddInt64(&f.apiCalls, 1)
resp, err := f.S3.ListObjects(&s3.ListObjectsInput{
Bucket: aws.String(f.Bucket),
Prefix: aws.String(path),
MaxKeys: aws.Int64(listMax),
})
if err != nil {
return err
}
for {
lastKey := ""
for _, key := range resp.Contents {
lastKey = *key.Key
keyPath := *key.Key
if strings.HasPrefix(keyPath, baseDir) {
keyPath = keyPath[len(baseDir):]
}
if keyPath == "" {
continue
}
if strings.HasSuffix(keyPath, "/") {
logrus.Debugln("S3 Walk:", keyPath, "for", baseDir)
continue
}
fi := fileInfo{
fullPath: *key.Key,
size: *key.Size,
etag: *key.ETag,
lastModified: *key.LastModified,
}
err = fn(keyPath, fi, err)
if err != nil {
return err
}
}
if *resp.IsTruncated {
atomic.AddInt64(&f.apiCalls, 1)
resp, err = f.S3.ListObjects(&s3.ListObjectsInput{
Bucket: aws.String(f.Bucket),
Prefix: aws.String(path),
MaxKeys: aws.Int64(listMax),
Marker: aws.String(lastKey),
})
if err != nil {
return err
}
} else {
break
}
}
return nil
}
func (f *s3Storage) List(path string, fn walkFunc) error {
path = f.fullPath(path)
if path != "/" && path[len(path)-1] != '/' {
path = path + "/"
}
atomic.AddInt64(&f.apiCalls, 1)
resp, err := f.S3.ListObjects(&s3.ListObjectsInput{
Bucket: aws.String(f.Bucket),
Prefix: aws.String(path),
Delimiter: aws.String("/"),
MaxKeys: aws.Int64(listMax),
})
if err != nil {
return err
}
for {
for _, key := range resp.Contents {
keyPath := *key.Key
if strings.HasPrefix(keyPath, path) {
keyPath = keyPath[len(path):]
}
if keyPath == "" {
continue
}
fi := fileInfo{
fullPath: *key.Key,
size: *key.Size,
etag: *key.ETag,
lastModified: *key.LastModified,
directory: strings.HasSuffix(*key.Key, "/"),
}
err = fn(keyPath, fi, err)
if err != nil {
return err
}
}
for _, commonPrefix := range resp.CommonPrefixes {
prefixPath := *commonPrefix.Prefix
if strings.HasPrefix(prefixPath, path) {
prefixPath = prefixPath[len(path):]
}
if prefixPath == "" {
continue
}
fi := fileInfo{
fullPath: *commonPrefix.Prefix,
directory: true,
}
err = fn(prefixPath, fi, err)
if err != nil {
return err
}
}
if *resp.IsTruncated {
atomic.AddInt64(&f.apiCalls, 1)
resp, err = f.S3.ListObjects(&s3.ListObjectsInput{
Bucket: aws.String(f.Bucket),
Prefix: aws.String(path),
MaxKeys: aws.Int64(listMax),
Delimiter: aws.String("/"),
Marker: resp.NextMarker,
})
if err != nil {
return err
}
} else {
break
}
}
return nil
}
func (f *s3Storage) Read(path string, etag string) ([]byte, error) {
cachePath := filepath.Join(*s3CacheStorage, path)
if etag != "" && *s3CacheStorage != "" {
file, err := ioutil.ReadFile(cachePath)
if err == nil {
if compareEtag(file, etag) {
atomic.AddInt64(&f.cacheHits, 1)
return file, nil
} else {
atomic.AddInt64(&f.cacheError, 1)
}
} else if os.IsNotExist(err) {
atomic.AddInt64(&f.cacheMiss, 1)
logrus.Infoln("CACHE MISS:", path)
}
}
atomic.AddInt64(&f.apiCalls, 1)
resp, err := f.S3.GetObject(&s3.GetObjectInput{
Bucket: aws.String(f.Bucket),
Key: aws.String(f.fullPath(path)),
})
if err != nil {
return nil, err
}
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if etag != "" && *s3CacheStorage != "" {
os.MkdirAll(filepath.Dir(cachePath), 0700)
ioutil.WriteFile(cachePath, data, 0600)
}
return data, nil
}
func (f *s3Storage) Delete(path string) error {
atomic.AddInt64(&f.freeApiCalls, 1)
_, err := f.S3.DeleteObject(&s3.DeleteObjectInput{
Bucket: aws.String(f.Bucket),
Key: aws.String(f.fullPath(path)),
})
return err
}
func (f *s3Storage) Move(path, newPath string) error {
atomic.AddInt64(&f.expensiveApiCalls, 1)
_, err := f.S3.CopyObject(&s3.CopyObjectInput{
CopySource: aws.String("/" + f.Bucket + "/" + f.fullPath(path)),
Bucket: aws.String(f.Bucket),
Key: aws.String(f.backupPath(newPath)),
})
if err != nil {
return err
}
return f.Delete(path)
}
func (f *s3Storage) Info() {
logrus.Infoln("S3 INFO: API calls/expensive/free:", f.apiCalls, f.expensiveApiCalls, f.freeApiCalls,
"Cache (hit/miss/error):", f.cacheHits, f.cacheMiss, f.cacheError)
}
func newS3Storage(config *distributionStorageS3) (storageObject, error) {
awsConfig := aws.NewConfig()
awsConfig.Endpoint = config.RegionEndpoint
awsConfig.Region = config.Region
if config.AccessKey != "" && config.SecretKey != "" {
awsConfig.Credentials = credentials.NewStaticCredentials(config.AccessKey, config.SecretKey, "")
}
sess, err := session.NewSession()
if err != nil {
return nil, err
}
storage := &s3Storage{
distributionStorageS3: config,
S3: s3.New(sess, awsConfig),
}
return storage, err
}