func RestoreDirectoryParallel()

in s3plugin/restore.go [114:196]


func RestoreDirectoryParallel(c *cli.Context) error {
	start := time.Now()
	totalBytes := int64(0)
	parallel := 5
	config, sess, err := readConfigAndStartSession(c)
	if err != nil {
		return err
	}
	dirName := c.Args().Get(1)
	if len(c.Args()) == 3 {
		parallel, _ = strconv.Atoi(c.Args().Get(2))
	}
	bucket := config.Options.Bucket
	gplog.Verbose("Restore Directory Parallel '%s' from S3", dirName)
	gplog.Verbose("S3 Location = s3://%s/%s", bucket, dirName)
	fmt.Printf("dirKey = %s\n", dirName)

	_ = os.MkdirAll(dirName, 0775)
	client := s3.New(sess)
	params := &s3.ListObjectsV2Input{Bucket: &bucket, Prefix: &dirName}
	bucketObjectsList, _ := client.ListObjectsV2(params)

	// Create a list of files to be restored
	numFiles := 0
	fileList := make([]string, 0)
	for _, key := range bucketObjectsList.Contents {
		gplog.Verbose("File '%s' = %d bytes", filepath.Base(*key.Key), *key.Size)
		if strings.HasSuffix(*key.Key, "/") {
			// Got a directory
			continue
		}
		fileList = append(fileList, *key.Key)
	}

	var wg sync.WaitGroup
	var finalErr error
	// Create jobs using a channel
	fileChannel := make(chan string, len(fileList))
	for _, fileKey := range fileList {
		wg.Add(1)
		fileChannel <- fileKey
	}
	close(fileChannel)
	// Process the files in parallel
	for i := 0; i < parallel; i++ {
		go func(jobs chan string) {
			for fileKey := range jobs {
				fileName := fileKey
				if strings.Contains(fileKey, "/") {
					fileName = filepath.Base(fileKey)
				}
				// construct local file name
				filePath := dirName + "/" + fileName
				file, err := os.Create(filePath)
				if err != nil {
					finalErr = err
					return
				}
				bytes, elapsed, err := downloadFile(sess, config, bucket, fileKey, file)
				if err == nil {
					totalBytes += bytes
					numFiles++
					msg := fmt.Sprintf("Downloaded %d bytes for %s in %v", bytes,
						filepath.Base(fileKey), elapsed.Round(time.Millisecond))
					gplog.Verbose(msg)
					fmt.Println(msg)
				} else {
					finalErr = err
					gplog.FatalOnError(err)
					_ = os.Remove(filePath)
				}
				_ = file.Close()
				wg.Done()
			}
		}(fileChannel)
	}
	// Wait for jobs to be done
	wg.Wait()

	fmt.Printf("Downloaded %d files (%d bytes) in %v\n",
		numFiles, totalBytes, time.Since(start).Round(time.Millisecond))
	return finalErr
}