func()

in cli_tools/gce_onestep_image_import/onestep_importer/aws_importer.go [454:524]


func (importer *awsImporter) transferFile(writer io.WriteCloser) error {
	if importer.transferFileFn != nil {
		return importer.transferFileFn()
	}
	// 1. Set up download size and get number of chunks to download
	output, err := humanize.ParseBytes(downloadBufSize)
	if err != nil {
		return daisy.ToDError(err)
	}
	readSize := int64(output)
	// Take ceiling to get number of chunks to download.
	readers := (importer.args.exportFileSize-1)/readSize + 1
	// Set up download retry delay interval
	delayTime := []int{1, 2, 4, 8, 8}
	maxRetryTimes := len(delayTime)

	// 2. Set up upload info
	importer.uploader = importer.getUploader(writer)
	importer.uploader.Add(1)
	go importer.uploader.uploadFile()

	// 3. Range download
	for i := int64(0); i < readers; i++ {
		startRange := i * readSize
		endRange := startRange + readSize - 1
		for retryAttempt := 0; ; retryAttempt++ {
			res, err := importer.s3Client.GetObject(&s3.GetObjectInput{
				Bucket: aws.String(importer.args.exportBucket),
				Key:    aws.String(importer.args.exportKey),
				Range:  aws.String(fmt.Sprintf("bytes=%v-%v", startRange, endRange)),
			})
			if err != nil {
				if retryAttempt >= maxRetryTimes {
					return daisy.Errf("error in downloading from %v: %v", importer.args.sourceFilePath, err)
				}
				time.Sleep(time.Duration(delayTime[retryAttempt]) * time.Second)
				continue
			}
			importer.uploader.readerChan <- res.Body
			break
		}

		// Stop downloading as soon as one of the upload fails.
		select {
		case err := <-importer.uploader.uploadErrChan:
			importer.uploader.cleanup()
			return err
		default:
			// No error, continue to download.
		}

		// Stop downloading if timeout exceeded.
		select {
		case <-importer.timeoutChan:
			importer.uploader.cleanup()
			return daisy.Errf("timeout exceeded during transfer file")
		default:
			// Did not timeout, continue to download.
		}
	}

	// All file chunks are downloaded, wait for upload to finish.
	close(importer.uploader.readerChan)
	importer.uploader.Wait()

	err = importer.uploader.writer.Close()
	if err != nil {
		return daisy.ToDError(err)
	}
	return nil
}