func downloadFileInParallel()

in s3plugin/restore.go [257:357]


func downloadFileInParallel(sess *session.Session, downloadConcurrency int, downloadChunkSize int64,
	totalBytes int64, bucket string, fileKey string, file *os.File) (int64, time.Duration, error) {

	var finalErr error
	start := time.Now()
	waitGroup := sync.WaitGroup{}
	numberOfChunks := int((totalBytes + downloadChunkSize - 1) / downloadChunkSize)
	bufferPointers := make([]*[]byte, numberOfChunks)
	copyChannel := make([]chan int, numberOfChunks)
	jobs := make(chan chunk, numberOfChunks)
	for i := 0; i < numberOfChunks; i++ {
		copyChannel[i] = make(chan int)
	}

	startByte := int64(0)
	endByte := int64(-1)
	done := false
	// Create jobs based on the number of chunks to be downloaded
	for chunkIndex := 0; chunkIndex < numberOfChunks && !done; chunkIndex++ {
		startByte = endByte + 1
		endByte += downloadChunkSize
		if endByte >= totalBytes {
			endByte = totalBytes - 1
			done = true
		}
		jobs <- chunk{chunkIndex, startByte, endByte}
		waitGroup.Add(1)
	}

	// Create a pool of download workers (based on concurrency)
	numberOfWorkers := downloadConcurrency
	if numberOfChunks < downloadConcurrency {
		numberOfWorkers = numberOfChunks
	}
	downloadBuffers := make(chan []byte, numberOfWorkers)
	for i := 0; i < cap(downloadBuffers); i++ {
		buffer := make([]byte, downloadChunkSize)
		downloadBuffers <- buffer
	}
	// Download concurrency is handled on our end hence we don't need to set concurrency
	downloader := s3manager.NewDownloader(sess, func(u *s3manager.Downloader) {
		u.PartSize = downloadChunkSize
		u.Concurrency = 1
	})
	gplog.Debug("Downloading file %s with chunksize %d and concurrency %d",
		filepath.Base(fileKey), downloadChunkSize, numberOfWorkers)

	for i := 0; i < numberOfWorkers; i++ {
		go func(id int) {
			for j := range jobs {
				buffer := <-downloadBuffers
				chunkStart := time.Now()
				byteRange := fmt.Sprintf("bytes=%d-%d", j.startByte, j.endByte)
				if j.endByte-j.startByte+1 != downloadChunkSize {
					buffer = make([]byte, j.endByte-j.startByte+1)
				}
				bufferPointers[j.chunkIndex] = &buffer
				gplog.Debug("Worker %d (chunk %d) for %s with partsize %d and concurrency %d",
					id, j.chunkIndex, filepath.Base(fileKey),
					downloader.PartSize, downloader.Concurrency)
				chunkBytes, err := downloader.Download(
					aws.NewWriteAtBuffer(buffer),
					&s3.GetObjectInput{
						Bucket: aws.String(bucket),
						Key:    aws.String(fileKey),
						Range:  aws.String(byteRange),
					})
				if err != nil {
					finalErr = err
				}
				gplog.Debug("Worker %d Downloaded %d bytes (chunk %d) for %s in %v",
					id, chunkBytes, j.chunkIndex, filepath.Base(fileKey),
					time.Since(chunkStart).Round(time.Millisecond))
				copyChannel[j.chunkIndex] <- j.chunkIndex
			}
		}(i)
	}

	// Copy data from download buffers into the output stream sequentially
	go func() {
		for i := range copyChannel {
			currentChunk := <-copyChannel[i]
			chunkStart := time.Now()
			numBytes, err := file.Write(*bufferPointers[currentChunk])
			if err != nil {
				finalErr = err
			}
			gplog.Debug("Copied %d bytes (chunk %d) for %s in %v",
				numBytes, currentChunk, filepath.Base(fileKey),
				time.Since(chunkStart).Round(time.Millisecond))
			// Deallocate buffer
			downloadBuffers <- *bufferPointers[currentChunk]
			bufferPointers[currentChunk] = nil
			waitGroup.Done()
			close(copyChannel[i])
		}
	}()

	waitGroup.Wait()
	return totalBytes, time.Since(start), errors.Wrap(finalErr, fmt.Sprintf("Error while downloading %s", fileKey))
}