s3plugin/backup.go (193 lines of code) (raw):
package s3plugin
import (
"bufio"
"fmt"
"os"
"path/filepath"
"strconv"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/cloudberrydb/gp-common-go-libs/gplog"
"github.com/pkg/errors"
"github.com/urfave/cli"
)
func SetupPluginForBackup(c *cli.Context) error {
scope := (Scope)(c.Args().Get(2))
if scope != Master && scope != Coordinator && scope != SegmentHost {
return nil
}
config, sess, err := readConfigAndStartSession(c)
if err != nil {
return err
}
localBackupDir := c.Args().Get(1)
_, timestamp := filepath.Split(localBackupDir)
testFileName := fmt.Sprintf("gpbackup_%s_report", timestamp)
testFilePath := fmt.Sprintf("%s/%s", localBackupDir, testFileName)
fileKey := GetS3Path(config.Options.Folder, testFilePath)
file, err := os.Create("/tmp/" + testFileName) // dummy empty reader for probe
defer file.Close()
if err != nil {
return err
}
_, _, err = uploadFile(sess, config, fileKey, file)
return err
}
func BackupFile(c *cli.Context) error {
config, sess, err := readConfigAndStartSession(c)
if err != nil {
return err
}
fileName := c.Args().Get(1)
fileKey := GetS3Path(config.Options.Folder, fileName)
file, err := os.Open(fileName)
defer file.Close()
if err != nil {
return err
}
bytes, elapsed, err := uploadFile(sess, config, fileKey, file)
if err != nil {
return err
}
gplog.Info("Uploaded %d bytes for %s in %v", bytes, filepath.Base(fileKey),
elapsed.Round(time.Millisecond))
return nil
}
func BackupDirectory(c *cli.Context) error {
start := time.Now()
totalBytes := int64(0)
config, sess, err := readConfigAndStartSession(c)
if err != nil {
return err
}
dirName := c.Args().Get(1)
gplog.Verbose("Restore Directory '%s' from S3", dirName)
gplog.Verbose("S3 Location = s3://%s/%s", config.Options.Bucket, dirName)
gplog.Info("dirKey = %s\n", dirName)
// Populate a list of files to be backed up
fileList := make([]string, 0)
_ = filepath.Walk(dirName, func(path string, f os.FileInfo, err error) error {
isDir, _ := isDirectoryGetSize(path)
if !isDir {
fileList = append(fileList, path)
}
return nil
})
// Process the files sequentially
for _, fileName := range fileList {
file, err := os.Open(fileName)
if err != nil {
return err
}
bytes, elapsed, err := uploadFile(sess, config, fileName, file)
_ = file.Close()
if err != nil {
return err
}
totalBytes += bytes
gplog.Debug("Uploaded %d bytes for %s in %v", bytes,
filepath.Base(fileName), elapsed.Round(time.Millisecond))
}
gplog.Info("Uploaded %d files (%d bytes) in %v\n", len(fileList),
totalBytes, time.Since(start).Round(time.Millisecond))
return nil
}
func BackupDirectoryParallel(c *cli.Context) error {
start := time.Now()
totalBytes := int64(0)
parallel := 5
config, sess, err := readConfigAndStartSession(c)
if err != nil {
return err
}
dirName := c.Args().Get(1)
if len(c.Args()) == 3 {
parallel, _ = strconv.Atoi(c.Args().Get(2))
}
gplog.Verbose("Backup Directory '%s' to S3", dirName)
gplog.Verbose("S3 Location = s3://%s/%s", config.Options.Bucket, dirName)
gplog.Info("dirKey = %s\n", dirName)
// Populate a list of files to be backed up
fileList := make([]string, 0)
_ = filepath.Walk(dirName, func(path string, f os.FileInfo, err error) error {
isDir, _ := isDirectoryGetSize(path)
if !isDir {
fileList = append(fileList, path)
}
return nil
})
var wg sync.WaitGroup
var finalErr error
// Create jobs using a channel
fileChannel := make(chan string, len(fileList))
for _, fileKey := range fileList {
wg.Add(1)
fileChannel <- fileKey
}
close(fileChannel)
// Process the files in parallel
for i := 0; i < parallel; i++ {
go func(jobs chan string) {
for fileKey := range jobs {
file, err := os.Open(fileKey)
if err != nil {
finalErr = err
return
}
bytes, elapsed, err := uploadFile(sess, config, fileKey, file)
if err == nil {
totalBytes += bytes
msg := fmt.Sprintf("Uploaded %d bytes for %s in %v", bytes,
filepath.Base(fileKey), elapsed.Round(time.Millisecond))
gplog.Verbose(msg)
fmt.Println(msg)
} else {
finalErr = err
gplog.FatalOnError(err)
}
_ = file.Close()
wg.Done()
}
}(fileChannel)
}
// Wait for jobs to be done
wg.Wait()
gplog.Info("Uploaded %d files (%d bytes) in %v\n",
len(fileList), totalBytes, time.Since(start).Round(time.Millisecond))
return finalErr
}
func BackupData(c *cli.Context) error {
config, sess, err := readConfigAndStartSession(c)
if err != nil {
return err
}
dataFile := c.Args().Get(1)
fileKey := GetS3Path(config.Options.Folder, dataFile)
bytes, elapsed, err := uploadFile(sess, config, fileKey, os.Stdin)
if err != nil {
return err
}
gplog.Debug("Uploaded %d bytes for file %s in %v", bytes,
filepath.Base(fileKey), elapsed.Round(time.Millisecond))
return nil
}
func uploadFile(sess *session.Session, config *PluginConfig, fileKey string,
file *os.File) (int64, time.Duration, error) {
start := time.Now()
bucket := config.Options.Bucket
uploadChunkSize := config.Options.UploadChunkSize
uploadConcurrency := config.Options.UploadConcurrency
uploader := s3manager.NewUploader(sess, func(u *s3manager.Uploader) {
u.PartSize = uploadChunkSize
u.Concurrency = uploadConcurrency
})
gplog.Debug("Uploading file %s with chunksize %d and concurrency %d",
filepath.Base(fileKey), uploader.PartSize, uploader.Concurrency)
_, err := uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(bucket),
Key: aws.String(fileKey),
// This will cause memory issues if
// segment_per_host*uploadChunkSize*uploadConcurreny is larger than
// the amount of ram a system has.
Body: bufio.NewReaderSize(file, int(uploadChunkSize)*uploadConcurrency),
})
if err != nil {
return 0, -1, errors.Wrap(err, fmt.Sprintf("Error while uploading %s", fileKey))
}
bytes, err := getFileSize(uploader.S3, bucket, fileKey)
return bytes, time.Since(start), err
}