in src/go/cmd/eventhubtester/main.go [55:157]
func testEventHub() {
var enableDebugging = flag.Bool("enableDebugging", false, "enable debug logging")
flag.Parse()
if *enableDebugging {
log.EnableDebugging()
}
if envVarsAvailable := verifyEnvVars(); !envVarsAvailable {
usage()
os.Exit(1)
}
eventHubSenderName := cli.GetEnv(azure.AZURE_EVENTHUB_SENDERKEYNAME)
eventHubSenderKey := cli.GetEnv(azure.AZURE_EVENTHUB_SENDERKEY)
eventHubNamespaceName := cli.GetEnv(azure.AZURE_EVENTHUB_NAMESPACENAME)
eventHubHubName := cli.GetEnv(AZURE_EVENTHUB_HUBNAME)
// create the new event Hub name
log.Info.Printf("new event hub manager")
connectionString := createHubConnectionString(eventHubSenderName, eventHubSenderKey, eventHubNamespaceName)
hubmanager, err := eventhubs.NewHubManagerFromConnectionString(connectionString)
if err != nil {
panic(err)
}
log.Info.Printf("new event hub %s", eventHubHubName)
if _, err = hubmanager.Put(context.Background(), eventHubHubName); err == nil {
log.Info.Printf("created event hub %s", eventHubHubName)
} else {
if strings.Contains(err.Error(), "409") {
log.Info.Printf("the event hub %s already exists", eventHubHubName)
} else {
panic(err)
}
}
log.Info.Printf("new token provider")
provider, err := sas.NewTokenProvider(sas.TokenProviderWithKey(eventHubSenderName, eventHubSenderKey))
if err != nil {
panic(err)
}
log.Info.Printf("new hub %s, %s", eventHubNamespaceName, eventHubHubName)
hub, err := eventhubs.NewHub(eventHubNamespaceName, eventHubHubName, provider)
if err != nil {
panic(err)
}
log.Info.Printf("put a new event")
event := eventhubs.NewEvent([]byte(fmt.Sprintf("hello world %v", time.Now())))
if err := hub.Send(context.Background(), event); err != nil {
panic(err)
}
// set up wait group to wait for expected message
eventReceived := make(chan struct{})
// declare handler for incoming events
handler := func(ctx context.Context, event *eventhubs.Event) error {
log.Info.Printf("received: %s\n", string(event.Data))
// notify channel that event was received
eventReceived <- struct{}{}
return nil
}
info, err := hub.GetRuntimeInformation(context.Background())
if err != nil {
panic(err)
}
var receiveOption eventhubs.ReceiveOption
receiveOption = eventhubs.ReceiveWithStartingOffset(persist.StartOfStream)
for _, partitionID := range info.PartitionIDs {
_, err := hub.Receive(
context.Background(),
partitionID,
handler,
receiveOption,
)
if err != nil {
l.Fatalf("failed to receive for partition ID %s: %s\n", partitionID, err)
}
}
lastEventReceived := time.Now()
lastStatsOutput := time.Now()
ticker := time.NewTicker(time.Duration(10) * time.Millisecond)
defer ticker.Stop()
for time.Since(lastEventReceived) <= quitAfterInactiveSeconds {
select {
case <-eventReceived:
lastEventReceived = time.Now()
case <-ticker.C:
if time.Since(lastStatsOutput) > statsPrintRate {
lastStatsOutput = time.Now()
log.Info.Printf("still receiving events")
}
}
}
}