in upload/upload.go [40:133]
func Upload(cxt *DiskUploadContext) error {
// Get the channel that contains stream of disk data to upload
dataWithRangeChan, streamReadErrChan := GetDataWithRanges(cxt.VhdStream, cxt.UploadableRanges)
// The channel to send upload request to load-balancer
requtestChan := make(chan *concurrent.Request, 0)
// Prepare and start the load-balancer that load request across 'cxt.Parallelism' workers
loadBalancer := concurrent.NewBalancer(cxt.Parallelism)
loadBalancer.Init()
workerErrorChan, allWorkersFinishedChan := loadBalancer.Run(requtestChan)
// Calculate the actual size of the data to upload
uploadSizeInBytes := int64(0)
for _, r := range cxt.UploadableRanges {
uploadSizeInBytes += r.Length()
}
fmt.Printf("\nEffective upload size: %.2f MB (from %.2f MB originally)", float64(uploadSizeInBytes)/oneMB, float64(cxt.VhdStream.GetSize())/oneMB)
// Prepare and start the upload progress tracker
uploadProgress := progress.NewStatus(cxt.Parallelism, cxt.AlreadyProcessedBytes, uploadSizeInBytes, progress.NewComputestateDefaultSize())
progressChan := uploadProgress.Run()
// read progress status from progress tracker and print it
go readAndPrintProgress(progressChan, cxt.Resume)
// listen for errors reported by workers and print it
var allWorkSucceeded = true
go func() {
for {
fmt.Println(<-workerErrorChan)
allWorkSucceeded = false
}
}()
var err error
L:
for {
select {
case dataWithRange, ok := <-dataWithRangeChan:
if !ok {
close(requtestChan)
break L
}
// Create work request
//
req := &concurrent.Request{
Work: func() error {
err := cxt.BlobServiceClient.PutPage(cxt.ContainerName,
cxt.BlobName,
dataWithRange.Range.Start,
dataWithRange.Range.End,
storage.PageWriteTypeUpdate,
dataWithRange.Data,
nil)
if err == nil {
uploadProgress.ReportBytesProcessedCount(dataWithRange.Range.Length())
}
return err
},
ShouldRetry: func(e error) bool {
return true
},
ID: dataWithRange.Range.String(),
}
// Send work request to load balancer for processing
//
requtestChan <- req
case err = <-streamReadErrChan:
close(requtestChan)
loadBalancer.TearDownWorkers()
break L
}
}
<-allWorkersFinishedChan
uploadProgress.Close()
if !allWorkSucceeded {
err = errors.New("\nUpload Incomplete: Some blocks of the VHD failed to upload, rerun the command to upload those blocks")
}
if err == nil {
fmt.Printf("\r Completed: %3d%% [%10.2f MB] RemainingTime: %02dh:%02dm:%02ds Throughput: %d Mb/sec %2c ",
100,
float64(uploadSizeInBytes)/oneMB,
0, 0, 0,
0, ' ')
}
return err
}