pipe/s3.go (203 lines of code) (raw):
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package pipe
import (
"database/sql"
"fmt"
"io"
"os"
"strings"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/uber/storagetapper/config"
"github.com/uber/storagetapper/log"
"github.com/uber/storagetapper/metrics"
"golang.org/x/net/context" //"context"
)
type s3Client struct {
client *s3.S3
uploader *s3manager.Uploader
downloader *s3manager.Downloader
bucket string
opTimeout time.Duration
}
//Reader and writer interfaces
type s3Writer struct {
w *io.PipeWriter
ch chan error
cancel context.CancelFunc
}
type s3Reader struct {
r *io.PipeReader
ch chan error
cancel context.CancelFunc
}
func (p *s3Writer) Write(b []byte) (int, error) {
return p.w.Write(b)
}
func (p *s3Writer) Close() error {
_ = p.w.Close()
return <-p.ch
}
func (p *s3Writer) Flush() error {
return nil
}
func (p *s3Reader) Read(b []byte) (int, error) {
return p.r.Read(b)
}
func (p *s3Reader) Close() error {
return p.r.Close()
}
//s3WriterAt is an adapter to use PipeWriter as a WriterAt
//this is possible because we use streaming download
type s3WriterAt struct {
w io.Writer
}
func (p *s3WriterAt) WriteAt(b []byte, off int64) (n int, err error) {
return p.w.Write(b)
}
func (p *s3Client) OpenRead(name string, offset int64) (io.ReadCloser, error) {
log.Debugf("OpenRead: %v offset=%v", name, offset)
if offset != 0 {
return nil, fmt.Errorf("s3 read is non seekable")
}
r, w := io.Pipe()
ch := make(chan error)
ctx, cancel := context.WithTimeout(context.Background(), p.opTimeout)
go func() {
defer cancel()
_, err := p.downloader.DownloadWithContext(ctx, &s3WriterAt{w}, &s3.GetObjectInput{Bucket: &p.bucket, Key: &name})
if cerr := w.Close(); err == nil {
err = cerr
}
ch <- err
}()
return &s3Reader{r: r, ch: ch, cancel: cancel}, nil
}
func (p *s3Client) OpenWrite(name string) (flushWriteCloser, io.Seeker, error) {
name = strings.TrimSuffix(name, ".open")
log.Debugf("OpenWrite: %v", name)
r, w := io.Pipe()
ch := make(chan error)
ctx, cancel := context.WithTimeout(context.Background(), p.opTimeout)
go func() {
defer cancel()
_, err := p.uploader.UploadWithContext(ctx, &s3manager.UploadInput{Bucket: &p.bucket, Key: &name, Body: r})
ch <- err
}()
return &s3Writer{w: w, ch: ch, cancel: cancel}, nil, nil
}
func (p *s3Client) MkdirAll(path string, _ os.FileMode) error {
return nil
}
//implements os.FileInfo interface for s3.ListItem
type s3Stat struct {
*s3.Object
}
func (fs *s3Stat) Size() int64 {
if fs.Object.Size == nil {
return 0
}
return *fs.Object.Size
}
func (fs *s3Stat) ModTime() time.Time {
if fs.LastModified == nil {
return time.Time{}
}
return *fs.LastModified
}
func (fs *s3Stat) Mode() os.FileMode { return 0777 }
func (fs *s3Stat) Sys() interface{} { return *fs.Object }
func (fs *s3Stat) Name() string { return *fs.Object.Key }
func (fs *s3Stat) IsDir() bool { return false }
func (p *s3Client) ReadDir(dirname string, _ string) (res []os.FileInfo, err error) {
log.Debugf("ReadDir: %v", dirname)
if len(dirname) != 0 && dirname[len(dirname)-1] != '/' {
dirname = dirname + "/"
}
if len(dirname) != 0 && dirname[0] == '/' {
dirname = dirname[1:]
}
resp := &s3.ListObjectsOutput{IsTruncated: aws.Bool(true)}
for aws.BoolValue(resp.IsTruncated) {
resp, err = p.client.ListObjects(&s3.ListObjectsInput{Bucket: &p.bucket, Prefix: &dirname, Marker: resp.Marker})
if err != nil {
return nil, err
}
for _, v := range resp.Contents {
*v.Key = strings.TrimPrefix(*v.Key, dirname)
res = append(res, &s3Stat{v})
}
}
return res, nil
}
func (p *s3Client) Rename(oldpath, newpath string) error {
return nil
}
func (p *s3Client) Remove(path string) error {
return nil
}
func (p *s3Client) Cancel(f io.Closer) error {
w, ok := f.(*s3Writer)
if ok {
w.cancel()
return <-w.ch
}
r, ok := f.(*s3Reader)
if ok {
r.cancel()
return <-r.ch
}
return fmt.Errorf("wrong type")
}
type s3Pipe struct {
filePipe
client *s3Client
}
// s3Consumer consumes messages from Terrablob using topic specified during consumer creation
type s3Consumer struct {
fileConsumer
}
func init() {
registerPlugin("s3", initS3Pipe)
}
func initS3Pipe(cfg *config.PipeConfig, db *sql.DB) (Pipe, error) {
w := &aws.Config{Region: &cfg.S3.Region, Endpoint: &cfg.S3.Endpoint, S3ForcePathStyle: aws.Bool(true)}
if cfg.S3.AccessKeyID != "" {
w.Credentials = credentials.NewStaticCredentials(cfg.S3.AccessKeyID, cfg.S3.SecretAccessKey, cfg.S3.SessionToken)
}
s, err := session.NewSession(w)
if log.E(err) {
return nil, err
}
client := s3.New(s)
result, err := client.CreateBucket(&s3.CreateBucketInput{
Bucket: &cfg.S3.Bucket,
})
if log.E(err) {
aerr, ok := err.(awserr.Error)
if !ok || (aerr.Code() != s3.ErrCodeBucketAlreadyExists && aerr.Code() != s3.ErrCodeBucketAlreadyOwnedByYou) {
return nil, err
}
} else {
log.Infof("S3 bucket created in: %s", result)
}
uploader := s3manager.NewUploaderWithClient(client, func(d *s3manager.Uploader) {
d.Concurrency = 1
})
downloader := s3manager.NewDownloaderWithClient(client, func(d *s3manager.Downloader) {
d.Concurrency = 1
})
return &s3Pipe{filePipe{cfg.S3.BaseDir, *cfg}, &s3Client{client, uploader, downloader, cfg.S3.Bucket, cfg.S3.Timeout}}, nil
}
// Type returns Pipe type as Terrablob
func (p *s3Pipe) Type() string {
return "s3"
}
//NewProducer registers a new Terrablob producer
func (p *s3Pipe) NewProducer(topic string) (Producer, error) {
m := metrics.NewFilePipeMetrics("pipe_producer", map[string]string{"topic": topic, "pipeType": "s3"})
return &fileProducer{filePipe: &p.filePipe, topic: topic, files: make(map[string]*file), fs: p.client, metrics: m, stats: make(map[string]*stat)}, nil
}
//NewConsumer registers a new Terrablob consumer
func (p *s3Pipe) NewConsumer(topic string) (Consumer, error) {
m := metrics.NewFilePipeMetrics("pipe_consumer", map[string]string{"topic": topic, "pipeType": "s3"})
c := &s3Consumer{fileConsumer{filePipe: &p.filePipe, topic: topic, fs: p.client, metrics: m}}
_, err := p.initConsumer(&c.fileConsumer, c.fetchNextPoll)
return c, err
}