func Upload()

in upload/upload.go [40:133]


func Upload(cxt *DiskUploadContext) error {
	// Get the channel that contains stream of disk data to upload
	dataWithRangeChan, streamReadErrChan := GetDataWithRanges(cxt.VhdStream, cxt.UploadableRanges)

	// The channel to send upload request to load-balancer
	requtestChan := make(chan *concurrent.Request, 0)

	// Prepare and start the load-balancer that load request across 'cxt.Parallelism' workers
	loadBalancer := concurrent.NewBalancer(cxt.Parallelism)
	loadBalancer.Init()
	workerErrorChan, allWorkersFinishedChan := loadBalancer.Run(requtestChan)

	// Calculate the actual size of the data to upload
	uploadSizeInBytes := int64(0)
	for _, r := range cxt.UploadableRanges {
		uploadSizeInBytes += r.Length()
	}
	fmt.Printf("\nEffective upload size: %.2f MB (from %.2f MB originally)", float64(uploadSizeInBytes)/oneMB, float64(cxt.VhdStream.GetSize())/oneMB)

	// Prepare and start the upload progress tracker
	uploadProgress := progress.NewStatus(cxt.Parallelism, cxt.AlreadyProcessedBytes, uploadSizeInBytes, progress.NewComputestateDefaultSize())
	progressChan := uploadProgress.Run()

	// read progress status from progress tracker and print it
	go readAndPrintProgress(progressChan, cxt.Resume)

	// listen for errors reported by workers and print it
	var allWorkSucceeded = true
	go func() {
		for {
			fmt.Println(<-workerErrorChan)
			allWorkSucceeded = false
		}
	}()

	var err error
L:
	for {
		select {
		case dataWithRange, ok := <-dataWithRangeChan:
			if !ok {
				close(requtestChan)
				break L
			}

			// Create work request
			//
			req := &concurrent.Request{
				Work: func() error {
					err := cxt.BlobServiceClient.PutPage(cxt.ContainerName,
						cxt.BlobName,
						dataWithRange.Range.Start,
						dataWithRange.Range.End,
						storage.PageWriteTypeUpdate,
						dataWithRange.Data,
						nil)
					if err == nil {
						uploadProgress.ReportBytesProcessedCount(dataWithRange.Range.Length())
					}
					return err
				},
				ShouldRetry: func(e error) bool {
					return true
				},
				ID: dataWithRange.Range.String(),
			}

			// Send work request to load balancer for processing
			//
			requtestChan <- req
		case err = <-streamReadErrChan:
			close(requtestChan)
			loadBalancer.TearDownWorkers()
			break L
		}
	}

	<-allWorkersFinishedChan
	uploadProgress.Close()

	if !allWorkSucceeded {
		err = errors.New("\nUpload Incomplete: Some blocks of the VHD failed to upload, rerun the command to upload those blocks")
	}

	if err == nil {
		fmt.Printf("\r Completed: %3d%% [%10.2f MB] RemainingTime: %02dh:%02dm:%02ds Throughput: %d Mb/sec  %2c ",
			100,
			float64(uploadSizeInBytes)/oneMB,
			0, 0, 0,
			0, ' ')

	}
	return err
}