pkg/output/s3/s3.go (81 lines of code) (raw):
// Package s3 implements the output of logs to an AWS s3 bucket
//
// For configuration, "type", "bucket", "region", and "prefix" are all required.
//
// "delimiter" is optional and controls the character written between log entries, by default "/n"
//
// output:
// type: aws:s3
// bucket: "bucket_name"
// region: "us-west"
// delimiter: "/n"
// prefix: "my_name" ;; my_name_0123456789_001.gz
//
// Assumptions:
//
// - Either aws credentials file or environment variables (AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY) are set.
//
// - Credentials have rights to put an S3 object into the bucket.
package s3
import (
"bytes"
"compress/gzip"
"context"
"fmt"
"math/rand"
"sync"
"time"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go/aws"
"github.com/elastic/go-ucfg"
"github.com/elastic/spigot/pkg/output"
)
// Name is the name used in the configuration file and the registry.
const Name = "s3"
var (
doOnce sync.Once
uploader *manager.Uploader
)
// S3Output holds config for writing to S3.
type S3Output struct {
delimiter string
bucket string
key string
prefix string
buf *bytes.Buffer
gw *gzip.Writer
}
func init() {
output.Register(Name, New)
}
// New is factory for creating a new S3Output
func New(cfg *ucfg.Config) (s output.Output, err error) {
c := defaultConfig()
if err := cfg.Unpack(&c); err != nil {
return nil, err
}
doOnce.Do(func() {
cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(c.Region))
if err != nil {
panic(err)
}
uploader = manager.NewUploader(s3.NewFromConfig(cfg))
})
var buf bytes.Buffer
gw := gzip.NewWriter(&buf)
key := fmt.Sprintf("%s_%19d_%03d.gz", c.Prefix, time.Now().UnixNano(), rand.Intn(1000))
s = &S3Output{
delimiter: c.Delimiter,
bucket: c.Bucket,
key: key,
prefix: c.Prefix,
buf: &buf,
gw: gw,
}
return s, nil
}
// Write writes log entry to internal buffer
func (s *S3Output) Write(b []byte) (n int, err error) {
j, err := s.gw.Write(b)
if err != nil {
return j, err
}
k, err := s.gw.Write([]byte(s.delimiter))
return j + k, err
}
// Close closes internal buffer and uploads data to S3
func (s *S3Output) Close() error {
s.gw.Close()
_, err := uploader.Upload(context.TODO(), &s3.PutObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(s.key),
Body: s.buf,
})
return err
}
func (s *S3Output) NewInterval() error {
s.Close()
s.buf.Reset()
s.gw = gzip.NewWriter(s.buf)
s.key = fmt.Sprintf("%s_%19d_%3d.gz", s.prefix, time.Now().UnixNano(), rand.Intn(1000))
return nil
}