in benchmark/producer.go [189:286]
func (bp *producerBenchmark) run(args []string) {
bp.flags.Parse(args)
if bp.topic == "" {
rlog.Error("Empty Topic", nil)
bp.flags.Usage()
return
}
if bp.groupID == "" {
rlog.Error("Empty Group Id", nil)
bp.flags.Usage()
return
}
if bp.nameSrv == "" {
rlog.Error("Empty Nameserver", nil)
bp.flags.Usage()
return
}
if bp.instanceCount <= 0 {
rlog.Error("Instance Count Must Be Positive Integer", nil)
bp.flags.Usage()
return
}
if bp.testMinutes <= 0 {
rlog.Error("Test Time Must Be Positive Integer", nil)
bp.flags.Usage()
return
}
if bp.bodySize <= 0 {
rlog.Error("Body Size Must Be Positive Integer", nil)
bp.flags.Usage()
return
}
stati := statiBenchmarkProducerSnapshot{}
snapshots := produceSnapshots{cur: &stati}
exitChan := make(chan struct{})
wg := sync.WaitGroup{}
for i := 0; i < bp.instanceCount; i++ {
i := i
go func() {
wg.Add(1)
bp.produceMsg(&stati, exitChan)
rlog.Info("Producer Done and Exit", map[string]interface{}{
"id": i,
})
wg.Done()
}()
}
// snapshot
wg.Add(1)
go func() {
defer wg.Done()
ticker := time.NewTicker(time.Second)
for {
select {
case <-ticker.C:
snapshots.takeSnapshot()
case <-exitChan:
ticker.Stop()
return
}
}
}()
// print statistic
wg.Add(1)
go func() {
defer wg.Done()
ticker := time.NewTicker(time.Second * 10)
for {
select {
case <-ticker.C:
snapshots.printStati()
case <-exitChan:
ticker.Stop()
return
}
}
}()
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
select {
case <-time.Tick(time.Minute * time.Duration(bp.testMinutes)):
case <-signalChan:
}
close(exitChan)
wg.Wait()
snapshots.takeSnapshot()
snapshots.printStati()
rlog.Info("Test Done", nil)
}