s3plugin/s3plugin.go (361 lines of code) (raw):

package s3plugin import ( "errors" "fmt" "io/ioutil" "net/http" "net/url" "os" "path/filepath" "regexp" "strconv" "strings" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/client" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/cloudberrydb/gp-common-go-libs/gplog" "github.com/cloudberrydb/gp-common-go-libs/operating" "github.com/inhies/go-bytesize" "github.com/olekukonko/tablewriter" "github.com/urfave/cli" "gopkg.in/yaml.v2" ) var Version string const apiVersion = "0.5.0" const Mebibyte = 1024 * 1024 const DefaultConcurrency = 6 const DefaultUploadChunkSize = int64(Mebibyte) * 500 // default 500MB const DefaultDownloadChunkSize = int64(Mebibyte) * 500 // default 500MB type Scope string const ( Master Scope = "master" Coordinator Scope = "coordinator" SegmentHost Scope = "segment_host" Segment Scope = "segment" ) type PluginConfig struct { ExecutablePath string `yaml:"executablepath"` Options PluginOptions `yaml:"options"` } type PluginOptions struct { AwsAccessKeyId string `yaml:"aws_access_key_id"` AwsSecretAccessKey string `yaml:"aws_secret_access_key"` BackupMaxConcurrentRequests string `yaml:"backup_max_concurrent_requests"` BackupMultipartChunksize string `yaml:"backup_multipart_chunksize"` Bucket string `yaml:"bucket"` Encryption string `yaml:"encryption"` Endpoint string `yaml:"endpoint"` Folder string `yaml:"folder"` HttpProxy string `yaml:"http_proxy"` Region string `yaml:"region"` RemoveDuplicateBucket string `yaml:"remove_duplicate_bucket"` RestoreMaxConcurrentRequests string `yaml:"restore_max_concurrent_requests"` RestoreMultipartChunksize string `yaml:"restore_multipart_chunksize"` PgPort string `yaml:"pgport"` BackupPluginVersion string `yaml:"backup_plugin_version"` UploadChunkSize int64 UploadConcurrency int DownloadChunkSize int64 DownloadConcurrency int } func CleanupPlugin(c *cli.Context) error { return nil } func GetAPIVersion(c *cli.Context) { fmt.Println(apiVersion) } /* * Helper Functions */ func readAndValidatePluginConfig(configFile string) (*PluginConfig, error) { config := &PluginConfig{} contents, err := ioutil.ReadFile(configFile) if err != nil { return nil, err } if err = yaml.UnmarshalStrict(contents, config); err != nil { return nil, fmt.Errorf("Yaml failures encountered reading config file %s. Error: %s", configFile, err.Error()) } if err = InitializeAndValidateConfig(config); err != nil { return nil, err } return config, nil } func InitializeAndValidateConfig(config *PluginConfig) error { var err error var errTxt string opt := &config.Options // Initialize defaults if opt.Region == "" { opt.Region = "unused" } if opt.Encryption == "" { opt.Encryption = "on" } if opt.RemoveDuplicateBucket == "" { opt.RemoveDuplicateBucket = "false" } opt.UploadChunkSize = DefaultUploadChunkSize opt.UploadConcurrency = DefaultConcurrency opt.DownloadChunkSize = DefaultDownloadChunkSize opt.DownloadConcurrency = DefaultConcurrency // Validate configurations and overwrite defaults if config.ExecutablePath == "" { errTxt += fmt.Sprintf("executable_path must exist and cannot be empty in plugin configuration file\n") } if opt.Bucket == "" { errTxt += fmt.Sprintf("bucket must exist and cannot be empty in plugin configuration file\n") } if opt.Folder == "" { errTxt += fmt.Sprintf("folder must exist and cannot be empty in plugin configuration file\n") } if opt.AwsAccessKeyId == "" { if opt.AwsSecretAccessKey != "" { errTxt += fmt.Sprintf("aws_access_key_id must exist in plugin configuration file if aws_secret_access_key does\n") } } else if opt.AwsSecretAccessKey == "" { errTxt += fmt.Sprintf("aws_secret_access_key must exist in plugin configuration file if aws_access_key_id does\n") } if opt.Region == "unused" && opt.Endpoint == "" { errTxt += fmt.Sprintf("region or endpoint must exist in plugin configuration file\n") } if opt.Encryption != "on" && opt.Encryption != "off" { errTxt += fmt.Sprintf("Invalid encryption configuration. Valid choices are on or off.\n") } if opt.RemoveDuplicateBucket != "true" && opt.RemoveDuplicateBucket != "false" { errTxt += fmt.Sprintf("Invalid value for remove_duplicate_bucket. Valid choices are true or false.\n") } if opt.BackupMultipartChunksize != "" { chunkSize, err := bytesize.Parse(opt.BackupMultipartChunksize) if err != nil { errTxt += fmt.Sprintf("Invalid backup_multipart_chunksize. Err: %s\n", err) } // Chunk size is being converted from uint64 to int64. This is safe as // long as chunksize smaller than math.MaxInt64 bytes (~9223 Petabytes) opt.UploadChunkSize = int64(chunkSize) } if opt.BackupMaxConcurrentRequests != "" { opt.UploadConcurrency, err = strconv.Atoi(opt.BackupMaxConcurrentRequests) if err != nil { errTxt += fmt.Sprintf("Invalid backup_max_concurrent_requests. Err: %s\n", err) } } if opt.RestoreMultipartChunksize != "" { chunkSize, err := bytesize.Parse(opt.RestoreMultipartChunksize) if err != nil { errTxt += fmt.Sprintf("Invalid restore_multipart_chunksize. Err: %s\n", err) } // Chunk size is being converted from uint64 to int64. This is safe as // long as chunksize smaller than math.MaxInt64 bytes (~9223 Petabytes) opt.DownloadChunkSize = int64(chunkSize) } if opt.RestoreMaxConcurrentRequests != "" { opt.DownloadConcurrency, err = strconv.Atoi(opt.RestoreMaxConcurrentRequests) if err != nil { errTxt += fmt.Sprintf("Invalid restore_max_concurrent_requests. Err: %s\n", err) } } if errTxt != "" { return errors.New(errTxt) } return nil } // CustomRetryer wraps the SDK's built in DefaultRetryer type CustomRetryer struct { client.DefaultRetryer } // ShouldRetry overrides the SDK's built in DefaultRetryer func (r CustomRetryer) ShouldRetry(req *request.Request) bool { if r.NumMaxRetries == 0 { return false } willRetry := false if req.Error != nil && strings.Contains(req.Error.Error(), "connection reset by peer") { willRetry = true } else if req.HTTPResponse.StatusCode == 404 && strings.Contains(req.Error.Error(), "NoSuchKey") { // 404 NoSuchKey error is possible due to AWS's eventual consistency // when attempting to inspect or get a file too quickly after it was // uploaded. The s3 plugin does exactly this to determine the amount of // bytes uploaded. For this reason we retry 404 errors. willRetry = true } else { willRetry = r.DefaultRetryer.ShouldRetry(req) } if willRetry { // While its possible to let the AWS client log for us, it doesn't seem // possible to set it up to only log errors. To prevent our log from // filling up with debug logs of successful https requests and // response, we'll only log when retries are attempted. if req.Error != nil { gplog.Debug("Https request attempt %d failed. Next attempt in %v. %s\n", req.RetryCount, r.RetryRules(req), req.Error.Error()) } else { gplog.Debug("Https request attempt %d failed. Next attempt in %v.\n", req.RetryCount, r.RetryRules(req)) } return true } return false } func readConfigAndStartSession(c *cli.Context) (*PluginConfig, *session.Session, error) { configPath := c.Args().Get(0) config, err := readAndValidatePluginConfig(configPath) if err != nil { return nil, nil, err } disableSSL := !ShouldEnableEncryption(config.Options.Encryption) awsConfig := request.WithRetryer(aws.NewConfig(), CustomRetryer{DefaultRetryer: client.DefaultRetryer{NumMaxRetries: 10}}). WithRegion(config.Options.Region). WithEndpoint(config.Options.Endpoint). WithS3ForcePathStyle(true). WithDisableSSL(disableSSL). WithUseDualStack(true) // Will use default credential chain if none provided if config.Options.AwsAccessKeyId != "" { awsConfig = awsConfig.WithCredentials( credentials.NewStaticCredentials( config.Options.AwsAccessKeyId, config.Options.AwsSecretAccessKey, "")) } if config.Options.HttpProxy != "" { httpclient := &http.Client{ Transport: &http.Transport{ Proxy: func(*http.Request) (*url.URL, error) { return url.Parse(config.Options.HttpProxy) }, }, } awsConfig.WithHTTPClient(httpclient) } sess, err := session.NewSession(awsConfig) if err != nil { return nil, nil, err } if config.Options.RemoveDuplicateBucket == "true" { sess.Handlers.Build.PushFront(removeBucketFromPath) } return config, sess, nil } func ShouldEnableEncryption(encryption string) bool { isOff := strings.EqualFold(encryption, "off") return !isOff } func isDirectoryGetSize(path string) (bool, int64) { fd, err := os.Stat(path) if err != nil { gplog.FatalOnError(err) } switch mode := fd.Mode(); { case mode.IsDir(): return true, 0 case mode.IsRegular(): return false, fd.Size() } gplog.FatalOnError(errors.New(fmt.Sprintf("INVALID file %s", path))) return false, 0 } func getFileSize(S3 s3iface.S3API, bucket string, fileKey string) (int64, error) { req, resp := S3.HeadObjectRequest(&s3.HeadObjectInput{ Bucket: aws.String(bucket), Key: aws.String(fileKey), }) err := req.Send() if err != nil { return 0, err } return *resp.ContentLength, nil } func GetS3Path(folder string, path string) string { /* a typical path for an already-backed-up file will be stored in a parent directory of a segment, and beneath that, under a datestamp/timestamp/ hierarchy. We assume the incoming path is a long absolute one. For example from the test bench: testdir_for_del="/tmp/testseg/backups/$current_date_for_del/$time_second_for_del" testfile_for_del="$testdir_for_del/testfile_$time_second_for_del.txt" Therefore, the incoming path is relevant to S3 in only the last four segments, which indicate the file and its 2 date/timestamp parents, and the grandparent "backups" */ pathArray := strings.Split(path, "/") lastFour := strings.Join(pathArray[(len(pathArray)-4):], "/") return fmt.Sprintf("%s/%s", folder, lastFour) } func DeleteBackup(c *cli.Context) error { timestamp := c.Args().Get(1) if timestamp == "" { return errors.New("delete requires a <timestamp>") } if !IsValidTimestamp(timestamp) { msg := fmt.Sprintf("delete requires a <timestamp> with format "+ "YYYYMMDDHHMMSS, but received: %s", timestamp) return fmt.Errorf(msg) } date := timestamp[0:8] // note that "backups" is a directory is a fact of how we save, choosing // to use the 3 parent directories of the source file. That becomes: // <s3folder>/backups/<date>/<timestamp> config, sess, err := readConfigAndStartSession(c) if err != nil { return err } deletePath := filepath.Join(config.Options.Folder, "backups", date, timestamp) bucket := config.Options.Bucket gplog.Debug("Delete location = s3://%s/%s", bucket, deletePath) service := s3.New(sess) iter := s3manager.NewDeleteListIterator(service, &s3.ListObjectsInput{ Bucket: aws.String(bucket), Prefix: aws.String(deletePath), }) batchClient := s3manager.NewBatchDeleteWithClient(service) return batchClient.Delete(aws.BackgroundContext(), iter) } func ListDirectory(c *cli.Context) error { var err error config, sess, err := readConfigAndStartSession(c) if err != nil { return err } bucket := config.Options.Bucket var listPath string if len(c.Args()) == 2 { listPath = c.Args().Get(1) } else { listPath = config.Options.Folder } client := s3.New(sess) params := &s3.ListObjectsV2Input{Bucket: &bucket, Prefix: &listPath} bucketObjectsList, _ := client.ListObjectsV2(params) fileSizes := make([][]string, 0) gplog.Verbose("Retrieving file information from directory %s in S3", listPath) for _, key := range bucketObjectsList.Contents { if strings.HasSuffix(*key.Key, "/") { // Got a directory continue } downloader := s3manager.NewDownloader(sess, func(u *s3manager.Downloader) { u.PartSize = config.Options.DownloadChunkSize }) totalBytes, err := getFileSize(downloader.S3, bucket, *key.Key) if err != nil { return err } fileSizes = append(fileSizes, []string{*key.Key, fmt.Sprint(totalBytes)}) } // Render the data as a table table := tablewriter.NewWriter(operating.System.Stdout) columns := []string{"NAME", "SIZE(bytes)"} table.SetHeader(columns) colors := make([]tablewriter.Colors, len(columns)) for i := range colors { colors[i] = tablewriter.Colors{tablewriter.Bold} } table.SetHeaderColor(colors...) table.SetCenterSeparator(" ") table.SetColumnSeparator(" ") table.SetRowSeparator(" ") table.SetHeaderAlignment(tablewriter.ALIGN_LEFT) table.SetHeaderLine(true) table.SetAutoFormatHeaders(false) table.SetBorders(tablewriter.Border{Left: true, Right: true, Bottom: false, Top: false}) table.AppendBulk(fileSizes) table.Render() return err } func DeleteDirectory(c *cli.Context) error { config, sess, err := readConfigAndStartSession(c) if err != nil { return err } deletePath := c.Args().Get(1) bucket := config.Options.Bucket gplog.Verbose("Deleting directory s3://%s/%s", bucket, deletePath) service := s3.New(sess) iter := s3manager.NewDeleteListIterator(service, &s3.ListObjectsInput{ Bucket: aws.String(bucket), Prefix: aws.String(deletePath), }) batchClient := s3manager.NewBatchDeleteWithClient(service) return batchClient.Delete(aws.BackgroundContext(), iter) } func IsValidTimestamp(timestamp string) bool { timestampFormat := regexp.MustCompile(`^([0-9]{14})$`) return timestampFormat.MatchString(timestamp) } // Some AWS SDK automatically prepends "/BucketName/" to any request's path, which breaks placement // of all objects when doing backups or restores with an Endpoint URL that already directs requests // to the correct bucket. To circumvent this, we manually remove the initial Bucket reference from // the path in this case. NOTE: this does not happen in if an IP address is used directly, so we // attempt to parse IP addresses and do not invoke this removal if found. func removeBucketFromPath(req *request.Request) { req.Operation.HTTPPath = strings.Replace(req.Operation.HTTPPath, "/{Bucket}", "", -1) if !strings.HasPrefix(req.Operation.HTTPPath, "/") { req.Operation.HTTPPath = "/" + req.Operation.HTTPPath } req.HTTPRequest.URL.Path = strings.Replace(req.HTTPRequest.URL.Path, "/{Bucket}", "", -1) if !strings.HasPrefix(req.HTTPRequest.URL.Path, "/") { req.HTTPRequest.URL.Path = "/" + req.HTTPRequest.URL.Path } }