in s3plugin/backup.go [109:175]
func BackupDirectoryParallel(c *cli.Context) error {
start := time.Now()
totalBytes := int64(0)
parallel := 5
config, sess, err := readConfigAndStartSession(c)
if err != nil {
return err
}
dirName := c.Args().Get(1)
if len(c.Args()) == 3 {
parallel, _ = strconv.Atoi(c.Args().Get(2))
}
gplog.Verbose("Backup Directory '%s' to S3", dirName)
gplog.Verbose("S3 Location = s3://%s/%s", config.Options.Bucket, dirName)
gplog.Info("dirKey = %s\n", dirName)
// Populate a list of files to be backed up
fileList := make([]string, 0)
_ = filepath.Walk(dirName, func(path string, f os.FileInfo, err error) error {
isDir, _ := isDirectoryGetSize(path)
if !isDir {
fileList = append(fileList, path)
}
return nil
})
var wg sync.WaitGroup
var finalErr error
// Create jobs using a channel
fileChannel := make(chan string, len(fileList))
for _, fileKey := range fileList {
wg.Add(1)
fileChannel <- fileKey
}
close(fileChannel)
// Process the files in parallel
for i := 0; i < parallel; i++ {
go func(jobs chan string) {
for fileKey := range jobs {
file, err := os.Open(fileKey)
if err != nil {
finalErr = err
return
}
bytes, elapsed, err := uploadFile(sess, config, fileKey, file)
if err == nil {
totalBytes += bytes
msg := fmt.Sprintf("Uploaded %d bytes for %s in %v", bytes,
filepath.Base(fileKey), elapsed.Round(time.Millisecond))
gplog.Verbose(msg)
fmt.Println(msg)
} else {
finalErr = err
gplog.FatalOnError(err)
}
_ = file.Close()
wg.Done()
}
}(fileChannel)
}
// Wait for jobs to be done
wg.Wait()
gplog.Info("Uploaded %d files (%d bytes) in %v\n",
len(fileList), totalBytes, time.Since(start).Round(time.Millisecond))
return finalErr
}