plugins/teststeps/s3fileupload/s3fileupload.go (218 lines of code) (raw):

// Copyright (c) Facebook, Inc. and its affiliates. // // This source code is licensed under the MIT license found in the // LICENSE file in the root directory of this source tree. package s3fileupload import ( "archive/tar" "bytes" "compress/gzip" "encoding/json" "fmt" "io" "net/http" "os" "strconv" "strings" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/facebookincubator/contest/pkg/event" "github.com/facebookincubator/contest/pkg/event/testevent" "github.com/facebookincubator/contest/pkg/target" "github.com/facebookincubator/contest/pkg/test" "github.com/facebookincubator/contest/pkg/xcontext" "github.com/facebookincubator/contest/plugins/teststeps" ) // Name is the name used to look this plugin up. var Name = "s3FileUpload" // Event names for this plugin. const ( EventURL = event.Name("URL") ) // Events defines the events that a TestStep is allow to emit var Events = []event.Name{ EventURL, } // FileUpload is used to retrieve all the parameter, the plugin needs. type FileUpload struct { localPath *test.Param // Path to file that shall be uploaded fileName *test.Param // Filename to file that shall be uploaded s3Region string // AWS server region that shall be used s3Bucket string // AWS Bucket name s3Path string // Path where in the bucket to upload the file s3CredFile string // Path to the AWS credential file to authenticate the session s3CredProfile string // Profile of the AWS credential file that shall be used compGzip bool // Bool that defines if the data should be compressed to gzip before upload } // Datatype for the emmiting the URL Event type eventURLPayload struct { Msg string } // Name returns the plugin name. func (ts FileUpload) Name() string { return Name } func emitEvent(ctx xcontext.Context, name event.Name, payload interface{}, tgt *target.Target, ev testevent.Emitter) error { payloadStr, err := json.Marshal(payload) if err != nil { return fmt.Errorf("cannot encode payload for event '%s': %w", name, err) } rm := json.RawMessage(payloadStr) evData := testevent.Data{ EventName: name, Target: tgt, Payload: &rm, } if err := ev.Emit(ctx, evData); err != nil { return fmt.Errorf("cannot emit event EventURL: %w", err) } return nil } // Run executes the awsFileUpload. func (ts *FileUpload) Run(ctx xcontext.Context, ch test.TestStepChannels, params test.TestStepParameters, ev testevent.Emitter, resumeState json.RawMessage) (json.RawMessage, error) { // Validate the parameter if err := ts.validateAndPopulate(params); err != nil { return nil, err } f := func(ctx xcontext.Context, target *target.Target) error { // expand args path, err := ts.localPath.Expand(target) if err != nil { return fmt.Errorf("failed to expand argument '%s': %v", ts.localPath, err) } filename, err := ts.fileName.Expand(target) if err != nil { return fmt.Errorf("failed to expand argument dir '%s': %v", ts.fileName, err) } // bodyBytes will contain the file data to upload it var bodyBytes []byte // Compress if compress parameter is true if ts.compGzip { // Create the archive and write the output to the "out" Writer buf, err := createTarArchive(path, ctx) if err != nil { return fmt.Errorf("error creating an archive: %v", err) } filename = filename + ".tar.gz" bodyBytes = buf.Bytes() } else { // Read the file that should be uploaded bodyBytes, err = os.ReadFile(path) if err != nil { return fmt.Errorf("could not read the file: %v", err) } } // Upload file url, err := ts.upload(filename, bodyBytes, ctx) if err != nil { return fmt.Errorf("could not upload the file: %v", err) } // Emit URL event to get the url into the report if err := emitEvent(ctx, EventURL, eventURLPayload{Msg: url}, target, ev); err != nil { return fmt.Errorf("failed to emit event: %w", err) } return nil } return teststeps.ForEachTarget(Name, ctx, ch, f) } // Retrieve all the parameters defines through the jobDesc func (ts *FileUpload) validateAndPopulate(params test.TestStepParameters) error { // Retrieving parameter as json Raw.Message // validate path and filename ts.localPath = params.GetOne("path") if ts.localPath.IsEmpty() { return fmt.Errorf("missing or empty 'path' parameter") } ts.fileName = params.GetOne("filename") if ts.fileName.IsEmpty() { return fmt.Errorf("missing or empty 'filename' parameter") } // Retrieving parameter as string // validate s3region param := params.GetOne("s3region") if param.IsEmpty() { return fmt.Errorf("missing or empty 's3region' parameter") } ts.s3Region = param.String() // validate s3bucket param = params.GetOne("s3bucket") if param.IsEmpty() { return fmt.Errorf("missing or empty 's3bucket' parameter") } ts.s3Bucket = param.String() // validate s3path param = params.GetOne("s3path") if param.IsEmpty() { return fmt.Errorf("missing or empty 's3path' parameter") } ts.s3Path = param.String() // retrieve s3credfile ts.s3CredFile = params.GetOne("s3credfile").String() // retrieve s3credprofile ts.s3CredProfile = params.GetOne("s3credprofile").String() // Retrieving parameter as bool // validate compress param = params.GetOne("compgzip") if !param.IsEmpty() { v, err := strconv.ParseBool(param.String()) if err != nil { return fmt.Errorf("invalid non-boolean `compgzip` parameter: %v", err) } ts.compGzip = v } return nil } // ValidateParameters validates the parameters associated to the TestStep func (ts *FileUpload) ValidateParameters(_ xcontext.Context, params test.TestStepParameters) error { return ts.validateAndPopulate(params) } // New initializes and returns a new awsFileUpload test step. func New() test.TestStep { return &FileUpload{} } // Load returns the name, factory and events which are needed to register the step. func Load() (string, test.TestStepFactory, []event.Name) { return Name, New, Events } // createTarArchive creates compressed data writer and invokes addFileToArchive func createTarArchive(file string, ctx xcontext.Context) (*bytes.Buffer, error) { // Create buffer for the compressed data var buf bytes.Buffer // Create gzip and tar writers gzwriter := gzip.NewWriter(&buf) defer gzwriter.Close() tarwriter := tar.NewWriter(gzwriter) defer tarwriter.Close() // Write file into tar archive err := addFileToArchive(tarwriter, file, ctx) if err != nil { return &buf, err } return &buf, nil } // addFileToArchive takes the data and writes it into the tar archive func addFileToArchive(tarwriter *tar.Writer, filename string, ctx xcontext.Context) error { // Open the file which shall be written into the tar archive file, err := os.Open(filename) if err != nil { return fmt.Errorf("failed to open '%s': %w", filename, err) } defer func() { if err := file.Close(); err != nil { ctx.Warnf("failed to close file '%s': %w", filename, err) } }() // Retrieve the file stats info, err := file.Stat() if err != nil { return fmt.Errorf("failed to retrieve stats of '%s': %w", filename, err) } // Create a tar header from the file stats header, err := tar.FileInfoHeader(info, info.Name()) if err != nil { return err } header.Name = filename // Write file header to the tar archive err = tarwriter.WriteHeader(header) if err != nil { return err } // Write the file into the tar archive _, err = io.Copy(tarwriter, file) if err != nil { return err } return nil } // Upload the file that is specified in the JobDescritor func (ts *FileUpload) upload(filename string, data []byte, ctx xcontext.Context) (string, error) { // Create an AWS session s, err := session.NewSession(&aws.Config{Region: aws.String(ts.s3Region), Credentials: credentials.NewSharedCredentials( ts.s3CredFile, // your credential file path (default if empty) ts.s3CredProfile, // profile name (default if empty) )}) if err != nil { return "", fmt.Errorf("could not open a new session: %w", err) } // Creating an upload path where the file should be uploaded with a timestamp currentTime := time.Now() uploadPath := strings.Join([]string{ts.s3Path, currentTime.Format("20060102_150405")}, "/") uploadPath = strings.Join([]string{uploadPath, filename}, "_") // Uploading the file _, err = s3.New(s).PutObject(&s3.PutObjectInput{ Bucket: aws.String(ts.s3Bucket), Key: aws.String(uploadPath), ACL: aws.String("public-read"), Body: bytes.NewReader(data), ContentLength: aws.Int64(int64(len(data))), ContentType: aws.String(http.DetectContentType(data)), ContentDisposition: aws.String("attachment"), ServerSideEncryption: aws.String("AES256"), }) if err != nil { return "", fmt.Errorf("could not upload the file: %w", err) } else { ctx.Infof("Pushed the file to S3 Bucket!") } // Create download link for public ACL url := strings.Join([]string{"https://", ts.s3Bucket, ".s3.amazonaws.com/", uploadPath}, "") return url, nil }