in s3plugin/restore.go [257:357]
func downloadFileInParallel(sess *session.Session, downloadConcurrency int, downloadChunkSize int64,
totalBytes int64, bucket string, fileKey string, file *os.File) (int64, time.Duration, error) {
var finalErr error
start := time.Now()
waitGroup := sync.WaitGroup{}
numberOfChunks := int((totalBytes + downloadChunkSize - 1) / downloadChunkSize)
bufferPointers := make([]*[]byte, numberOfChunks)
copyChannel := make([]chan int, numberOfChunks)
jobs := make(chan chunk, numberOfChunks)
for i := 0; i < numberOfChunks; i++ {
copyChannel[i] = make(chan int)
}
startByte := int64(0)
endByte := int64(-1)
done := false
// Create jobs based on the number of chunks to be downloaded
for chunkIndex := 0; chunkIndex < numberOfChunks && !done; chunkIndex++ {
startByte = endByte + 1
endByte += downloadChunkSize
if endByte >= totalBytes {
endByte = totalBytes - 1
done = true
}
jobs <- chunk{chunkIndex, startByte, endByte}
waitGroup.Add(1)
}
// Create a pool of download workers (based on concurrency)
numberOfWorkers := downloadConcurrency
if numberOfChunks < downloadConcurrency {
numberOfWorkers = numberOfChunks
}
downloadBuffers := make(chan []byte, numberOfWorkers)
for i := 0; i < cap(downloadBuffers); i++ {
buffer := make([]byte, downloadChunkSize)
downloadBuffers <- buffer
}
// Download concurrency is handled on our end hence we don't need to set concurrency
downloader := s3manager.NewDownloader(sess, func(u *s3manager.Downloader) {
u.PartSize = downloadChunkSize
u.Concurrency = 1
})
gplog.Debug("Downloading file %s with chunksize %d and concurrency %d",
filepath.Base(fileKey), downloadChunkSize, numberOfWorkers)
for i := 0; i < numberOfWorkers; i++ {
go func(id int) {
for j := range jobs {
buffer := <-downloadBuffers
chunkStart := time.Now()
byteRange := fmt.Sprintf("bytes=%d-%d", j.startByte, j.endByte)
if j.endByte-j.startByte+1 != downloadChunkSize {
buffer = make([]byte, j.endByte-j.startByte+1)
}
bufferPointers[j.chunkIndex] = &buffer
gplog.Debug("Worker %d (chunk %d) for %s with partsize %d and concurrency %d",
id, j.chunkIndex, filepath.Base(fileKey),
downloader.PartSize, downloader.Concurrency)
chunkBytes, err := downloader.Download(
aws.NewWriteAtBuffer(buffer),
&s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(fileKey),
Range: aws.String(byteRange),
})
if err != nil {
finalErr = err
}
gplog.Debug("Worker %d Downloaded %d bytes (chunk %d) for %s in %v",
id, chunkBytes, j.chunkIndex, filepath.Base(fileKey),
time.Since(chunkStart).Round(time.Millisecond))
copyChannel[j.chunkIndex] <- j.chunkIndex
}
}(i)
}
// Copy data from download buffers into the output stream sequentially
go func() {
for i := range copyChannel {
currentChunk := <-copyChannel[i]
chunkStart := time.Now()
numBytes, err := file.Write(*bufferPointers[currentChunk])
if err != nil {
finalErr = err
}
gplog.Debug("Copied %d bytes (chunk %d) for %s in %v",
numBytes, currentChunk, filepath.Base(fileKey),
time.Since(chunkStart).Round(time.Millisecond))
// Deallocate buffer
downloadBuffers <- *bufferPointers[currentChunk]
bufferPointers[currentChunk] = nil
waitGroup.Done()
close(copyChannel[i])
}
}()
waitGroup.Wait()
return totalBytes, time.Since(start), errors.Wrap(finalErr, fmt.Sprintf("Error while downloading %s", fileKey))
}