in src/go/cmd/blobuploader/main.go [64:110]
func (b *BlobUploader) Run(syncWaitGroup *sync.WaitGroup) {
start := time.Now()
defer func() { b.JobRunTime = time.Now().Sub(start) }()
log.Info.Printf("started BlobUploader.Run()\n")
defer syncWaitGroup.Done()
var cancel context.CancelFunc
b.Context, cancel = context.WithCancel(b.Context)
// start the ready queue listener and its workers
// this uses the example from here: https://github.com/Azure/azure-storage-queue-go/blob/master/azqueue/zt_examples_test.go
for i := 0; i < b.ThreadCount; i++ {
syncWaitGroup.Add(1)
go b.StartBlobUploader(syncWaitGroup)
}
// dispatch jobs to the workers
dispatchedCount := 0
for dispatchedCount < b.BlobCount {
select {
case <-b.Context.Done():
return
case b.uploadBytesChannel <- b.BlobSizeBytes:
dispatchedCount++
case msg := <-b.successChannel:
b.BlobsUploaded++
b.BytesUploaded += msg
case <-b.failureChannel:
b.FailureCount++
}
}
// wait for completion
for {
select {
case msg := <-b.successChannel:
b.BlobsUploaded++
b.BytesUploaded += msg
case <-b.failureChannel:
b.FailureCount++
}
if (b.BlobsUploaded + b.FailureCount) == b.BlobCount {
cancel()
return
}
}
}