in benchmark/consumer.go [180:273]
func (bc *consumerBenchmark) run(args []string) {
bc.flags.Parse(args)
if bc.topic == "" {
rlog.Error("Empty Topic", nil)
bc.usage()
return
}
if bc.groupPrefix == "" {
rlog.Error("Empty Group Prefix", nil)
bc.usage()
return
}
if bc.nameSrv == "" {
rlog.Error("Empty Nameserver", nil)
bc.usage()
return
}
if bc.testMinutes <= 0 {
rlog.Error("Test Time Must Be Positive Integer", nil)
bc.usage()
return
}
if bc.instanceCount <= 0 {
rlog.Error("Thread Count Must Be Positive Integer", nil)
bc.usage()
return
}
bc.groupID = bc.groupPrefix
if bc.isPrefixEnable {
bc.groupID += fmt.Sprintf("_%d", time.Now().UnixNano()/int64(time.Millisecond)%100)
}
stati := statiBenchmarkConsumerSnapshot{}
snapshots := consumeSnapshots{cur: &stati}
exitChan := make(chan struct{})
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
bc.consumeMsg(&stati, exitChan)
wg.Done()
}()
// snapshot
wg.Add(1)
go func() {
defer wg.Done()
ticker := time.NewTicker(time.Second)
for {
select {
case <-ticker.C:
snapshots.takeSnapshot()
case <-exitChan:
ticker.Stop()
return
}
}
}()
// print statistic
wg.Add(1)
go func() {
defer wg.Done()
ticker := time.NewTicker(time.Second * 10)
for {
select {
case <-ticker.C:
snapshots.printStati()
case <-exitChan:
ticker.Stop()
return
}
}
}()
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
select {
case <-time.Tick(time.Minute * time.Duration(bc.testMinutes)):
case <-signalChan:
}
close(exitChan)
wg.Wait()
snapshots.takeSnapshot()
snapshots.printStati()
rlog.Info("Test Done", nil)
}