func()

in benchmark/producer.go [189:286]


func (bp *producerBenchmark) run(args []string) {
	bp.flags.Parse(args)

	if bp.topic == "" {
		rlog.Error("Empty Topic", nil)
		bp.flags.Usage()
		return
	}

	if bp.groupID == "" {
		rlog.Error("Empty Group Id", nil)
		bp.flags.Usage()
		return
	}

	if bp.nameSrv == "" {
		rlog.Error("Empty Nameserver", nil)
		bp.flags.Usage()
		return
	}
	if bp.instanceCount <= 0 {
		rlog.Error("Instance Count Must Be Positive Integer", nil)
		bp.flags.Usage()
		return
	}
	if bp.testMinutes <= 0 {
		rlog.Error("Test Time Must Be Positive Integer", nil)
		bp.flags.Usage()
		return
	}
	if bp.bodySize <= 0 {
		rlog.Error("Body Size Must Be Positive Integer", nil)
		bp.flags.Usage()
		return
	}

	stati := statiBenchmarkProducerSnapshot{}
	snapshots := produceSnapshots{cur: &stati}
	exitChan := make(chan struct{})
	wg := sync.WaitGroup{}

	for i := 0; i < bp.instanceCount; i++ {
		i := i
		go func() {
			wg.Add(1)
			bp.produceMsg(&stati, exitChan)
			rlog.Info("Producer Done and Exit", map[string]interface{}{
				"id": i,
			})
			wg.Done()
		}()
	}

	// snapshot
	wg.Add(1)
	go func() {
		defer wg.Done()
		ticker := time.NewTicker(time.Second)
		for {
			select {
			case <-ticker.C:
				snapshots.takeSnapshot()
			case <-exitChan:
				ticker.Stop()
				return
			}
		}
	}()

	// print statistic
	wg.Add(1)
	go func() {
		defer wg.Done()
		ticker := time.NewTicker(time.Second * 10)
		for {
			select {
			case <-ticker.C:
				snapshots.printStati()
			case <-exitChan:
				ticker.Stop()
				return
			}
		}
	}()

	signalChan := make(chan os.Signal, 1)
	signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
	select {
	case <-time.Tick(time.Minute * time.Duration(bp.testMinutes)):
	case <-signalChan:
	}

	close(exitChan)
	wg.Wait()
	snapshots.takeSnapshot()
	snapshots.printStati()
	rlog.Info("Test Done", nil)
}