in s3plugin/restore.go [114:196]
func RestoreDirectoryParallel(c *cli.Context) error {
start := time.Now()
totalBytes := int64(0)
parallel := 5
config, sess, err := readConfigAndStartSession(c)
if err != nil {
return err
}
dirName := c.Args().Get(1)
if len(c.Args()) == 3 {
parallel, _ = strconv.Atoi(c.Args().Get(2))
}
bucket := config.Options.Bucket
gplog.Verbose("Restore Directory Parallel '%s' from S3", dirName)
gplog.Verbose("S3 Location = s3://%s/%s", bucket, dirName)
fmt.Printf("dirKey = %s\n", dirName)
_ = os.MkdirAll(dirName, 0775)
client := s3.New(sess)
params := &s3.ListObjectsV2Input{Bucket: &bucket, Prefix: &dirName}
bucketObjectsList, _ := client.ListObjectsV2(params)
// Create a list of files to be restored
numFiles := 0
fileList := make([]string, 0)
for _, key := range bucketObjectsList.Contents {
gplog.Verbose("File '%s' = %d bytes", filepath.Base(*key.Key), *key.Size)
if strings.HasSuffix(*key.Key, "/") {
// Got a directory
continue
}
fileList = append(fileList, *key.Key)
}
var wg sync.WaitGroup
var finalErr error
// Create jobs using a channel
fileChannel := make(chan string, len(fileList))
for _, fileKey := range fileList {
wg.Add(1)
fileChannel <- fileKey
}
close(fileChannel)
// Process the files in parallel
for i := 0; i < parallel; i++ {
go func(jobs chan string) {
for fileKey := range jobs {
fileName := fileKey
if strings.Contains(fileKey, "/") {
fileName = filepath.Base(fileKey)
}
// construct local file name
filePath := dirName + "/" + fileName
file, err := os.Create(filePath)
if err != nil {
finalErr = err
return
}
bytes, elapsed, err := downloadFile(sess, config, bucket, fileKey, file)
if err == nil {
totalBytes += bytes
numFiles++
msg := fmt.Sprintf("Downloaded %d bytes for %s in %v", bytes,
filepath.Base(fileKey), elapsed.Round(time.Millisecond))
gplog.Verbose(msg)
fmt.Println(msg)
} else {
finalErr = err
gplog.FatalOnError(err)
_ = os.Remove(filePath)
}
_ = file.Close()
wg.Done()
}
}(fileChannel)
}
// Wait for jobs to be done
wg.Wait()
fmt.Printf("Downloaded %d files (%d bytes) in %v\n",
numFiles, totalBytes, time.Since(start).Round(time.Millisecond))
return finalErr
}