e2etest/stress_generators/threading.go (97 lines of code) (raw):
package main
import (
"fmt"
"github.com/Azure/azure-storage-azcopy/v10/e2etest"
"runtime"
"sync"
"sync/atomic"
"time"
)
/*
This is really slapdash
*/
var (
ThreadCount = func() int64 {
threadCount := int64(runtime.NumCPU()) * 4
if threadCount > 64 {
return 64 // mirror AzCopy's thread count
}
return threadCount
}()
PriorityJobQueue = make(chan func(), ThreadCount)
JobQueue = make(chan func(), ThreadCount)
)
func init() {
for range ThreadCount {
go func() {
var (
j func()
ok bool
)
for {
select {
case j, ok = <-PriorityJobQueue:
case j, ok = <-JobQueue:
}
if j != nil {
j()
j = nil
}
if !ok {
return
}
}
}()
}
}
func NewGenerationJobManager(itemCount int, statusIncrement int64) *GenerationJobManager {
out := &GenerationJobManager{
wg: &sync.WaitGroup{},
totalCount: e2etest.PtrOf[int64](int64(itemCount)),
doneCount: e2etest.PtrOf[int64](0),
failureCount: e2etest.PtrOf[int64](0),
statusIncrement: statusIncrement,
}
out.wg.Add(itemCount)
return out
}
type GenerationJobManager struct {
wg *sync.WaitGroup
totalCount *int64
doneCount *int64
failureCount *int64
statusIncrement int64
CustomAnnounce func() string
}
func (gjm *GenerationJobManager) ScheduleAdditionalItems(n int) {
gjm.wg.Add(n)
atomic.AddInt64(gjm.totalCount, int64(n))
}
func (gjm *GenerationJobManager) ScheduleItem(f func() error, prio bool) {
go func() {
f := func() {
defer gjm.wg.Done()
err := f()
if err != nil {
atomic.AddInt64(gjm.failureCount, 1)
fmt.Printf("failed to create an object: %s\n", err)
} else {
done := atomic.AddInt64(gjm.doneCount, 1)
if done%gjm.statusIncrement == 0 {
cAnnounce := func() string {
if gjm.CustomAnnounce == nil {
return ""
}
return " " + gjm.CustomAnnounce()
}
fmt.Printf("%v: generated %d items of %d!%v\n", time.Now().Format(time.Stamp), done, atomic.LoadInt64(gjm.totalCount), cAnnounce())
}
}
}
if prio {
PriorityJobQueue <- f
} else {
JobQueue <- f
}
}()
}
func (gjm *GenerationJobManager) Wait() {
gjm.wg.Wait()
}