func main()

in src/go/cmd/edasim/statscollector/main.go [100:180]


func main() {
	statsFilePath,
		eventHubSenderName,
		eventHubSenderKey,
		eventHubNamespaceName,
		eventHubHubName,
		uniqueName := initializeApplicationVariables()

	ioStatsCollector := file.InitializeIOStatsCollector(uniqueName)

	provider, err := sas.NewTokenProvider(sas.TokenProviderWithKey(eventHubSenderName, eventHubSenderKey))
	if err != nil {
		log.Error.Fatalf("failed to get token provider: %s\n", err)
	}

	// get an existing hub
	hub, err := eventhubs.NewHub(eventHubNamespaceName, eventHubHubName, provider)
	ctx := context.Background()
	defer hub.Close(ctx)
	if err != nil {
		log.Error.Fatalf("failed to get hub: %s\n", err)
	}

	// get info about partitions in hub
	info, err := hub.GetRuntimeInformation(ctx)
	if err != nil {
		log.Error.Fatalf("failed to get runtime info: %s\n", err)
	}
	log.Info.Printf("partition IDs: %s\n", info.PartitionIDs)

	// set up wait group to wait for expected message
	eventReceived := make(chan struct{})

	// declare handler for incoming events
	handler := func(ctx context.Context, event *eventhubs.Event) error {
		ioStatsCollector.RecordEvent(string(event.Data))
		eventReceived <- struct{}{}
		return nil
	}

	receiveOption := eventhubs.ReceiveWithStartingOffset(persist.StartOfStream)

	for _, partitionID := range info.PartitionIDs {
		_, err := hub.Receive(
			ctx,
			partitionID,
			handler,
			receiveOption,
		)
		if err != nil {
			log.Error.Fatalf("failed to receive for partition ID %s: %s\n", partitionID, err)
		}
	}

	lastStatsOutput := time.Now()
	lastEventReceived := time.Now()
	ticker := time.NewTicker(time.Duration(millisecondsSleep) * time.Millisecond)
	defer ticker.Stop()
	messagesProcessed := 0
	for time.Since(lastEventReceived) <= quitAfterInactiveSeconds {
		select {
		case <-eventReceived:
			lastEventReceived = time.Now()
			messagesProcessed++
		case <-ticker.C:
			if time.Since(lastStatsOutput) > statsPrintRate {
				lastStatsOutput = time.Now()
				log.Info.Printf("event messages processed %d", messagesProcessed)
			}
		}
	}

	log.Info.Printf("writing the files")
	ioStatsCollector.WriteRAWFiles(statsFilePath, uniqueName)

	log.Info.Printf("writing the summary file")
	ioStatsCollector.WriteBatchSummaryFiles(statsFilePath, uniqueName)

	log.Info.Printf("writing the io summary files")
	ioStatsCollector.WriteIOSummaryFiles(statsFilePath, uniqueName)
}