func()

in benchmark/consumer.go [180:273]


func (bc *consumerBenchmark) run(args []string) {
	bc.flags.Parse(args)
	if bc.topic == "" {
		rlog.Error("Empty Topic", nil)
		bc.usage()
		return
	}

	if bc.groupPrefix == "" {
		rlog.Error("Empty Group Prefix", nil)
		bc.usage()
		return
	}

	if bc.nameSrv == "" {
		rlog.Error("Empty Nameserver", nil)
		bc.usage()
		return
	}

	if bc.testMinutes <= 0 {
		rlog.Error("Test Time Must Be Positive Integer", nil)
		bc.usage()
		return
	}

	if bc.instanceCount <= 0 {
		rlog.Error("Thread Count Must Be Positive Integer", nil)
		bc.usage()
		return
	}

	bc.groupID = bc.groupPrefix
	if bc.isPrefixEnable {
		bc.groupID += fmt.Sprintf("_%d", time.Now().UnixNano()/int64(time.Millisecond)%100)
	}

	stati := statiBenchmarkConsumerSnapshot{}
	snapshots := consumeSnapshots{cur: &stati}
	exitChan := make(chan struct{})

	wg := sync.WaitGroup{}

	wg.Add(1)
	go func() {
		bc.consumeMsg(&stati, exitChan)
		wg.Done()
	}()

	// snapshot
	wg.Add(1)
	go func() {
		defer wg.Done()
		ticker := time.NewTicker(time.Second)
		for {
			select {
			case <-ticker.C:
				snapshots.takeSnapshot()
			case <-exitChan:
				ticker.Stop()
				return
			}
		}
	}()

	// print statistic
	wg.Add(1)
	go func() {
		defer wg.Done()
		ticker := time.NewTicker(time.Second * 10)
		for {
			select {
			case <-ticker.C:
				snapshots.printStati()
			case <-exitChan:
				ticker.Stop()
				return
			}
		}
	}()

	signalChan := make(chan os.Signal, 1)
	signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
	select {
	case <-time.Tick(time.Minute * time.Duration(bc.testMinutes)):
	case <-signalChan:
	}

	close(exitChan)
	wg.Wait()
	snapshots.takeSnapshot()
	snapshots.printStati()
	rlog.Info("Test Done", nil)
}