in src/go/cmd/edasim/statscollector/main.go [100:180]
func main() {
statsFilePath,
eventHubSenderName,
eventHubSenderKey,
eventHubNamespaceName,
eventHubHubName,
uniqueName := initializeApplicationVariables()
ioStatsCollector := file.InitializeIOStatsCollector(uniqueName)
provider, err := sas.NewTokenProvider(sas.TokenProviderWithKey(eventHubSenderName, eventHubSenderKey))
if err != nil {
log.Error.Fatalf("failed to get token provider: %s\n", err)
}
// get an existing hub
hub, err := eventhubs.NewHub(eventHubNamespaceName, eventHubHubName, provider)
ctx := context.Background()
defer hub.Close(ctx)
if err != nil {
log.Error.Fatalf("failed to get hub: %s\n", err)
}
// get info about partitions in hub
info, err := hub.GetRuntimeInformation(ctx)
if err != nil {
log.Error.Fatalf("failed to get runtime info: %s\n", err)
}
log.Info.Printf("partition IDs: %s\n", info.PartitionIDs)
// set up wait group to wait for expected message
eventReceived := make(chan struct{})
// declare handler for incoming events
handler := func(ctx context.Context, event *eventhubs.Event) error {
ioStatsCollector.RecordEvent(string(event.Data))
eventReceived <- struct{}{}
return nil
}
receiveOption := eventhubs.ReceiveWithStartingOffset(persist.StartOfStream)
for _, partitionID := range info.PartitionIDs {
_, err := hub.Receive(
ctx,
partitionID,
handler,
receiveOption,
)
if err != nil {
log.Error.Fatalf("failed to receive for partition ID %s: %s\n", partitionID, err)
}
}
lastStatsOutput := time.Now()
lastEventReceived := time.Now()
ticker := time.NewTicker(time.Duration(millisecondsSleep) * time.Millisecond)
defer ticker.Stop()
messagesProcessed := 0
for time.Since(lastEventReceived) <= quitAfterInactiveSeconds {
select {
case <-eventReceived:
lastEventReceived = time.Now()
messagesProcessed++
case <-ticker.C:
if time.Since(lastStatsOutput) > statsPrintRate {
lastStatsOutput = time.Now()
log.Info.Printf("event messages processed %d", messagesProcessed)
}
}
}
log.Info.Printf("writing the files")
ioStatsCollector.WriteRAWFiles(statsFilePath, uniqueName)
log.Info.Printf("writing the summary file")
ioStatsCollector.WriteBatchSummaryFiles(statsFilePath, uniqueName)
log.Info.Printf("writing the io summary files")
ioStatsCollector.WriteIOSummaryFiles(statsFilePath, uniqueName)
}