public static async Task Main()

in Sample-Code-Snippets/dotnet/EventHubs-Emulator-Kafka-Demo/EventHubs-Emulator-Kafka-Demo/Program.cs [5:77]


    public static async Task Main(string[] args)
    {
        // Configuration
        var kafkaBootstrapServers = "localhost:9092";
        var eventHubsConnectionString = "Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;";
        string eventHubName = "eh1";


        // Producer
        int produceCount = 100;
        var producerConfig = new ProducerConfig
        {
            BootstrapServers = kafkaBootstrapServers,
            SecurityProtocol = SecurityProtocol.SaslPlaintext,
            SaslMechanism = SaslMechanism.Plain,
            SaslUsername = "$ConnectionString",
            SaslPassword = eventHubsConnectionString
        };
        using (var producer = new ProducerBuilder<Null, string>(producerConfig).Build())
        {
            for (int i = 0; i < produceCount; i++)
            {
                var message = new Message<Null, string> { Value = $"Message {i}" };
                var deliveryResult = await producer.ProduceAsync(eventHubName, message);
                Console.WriteLine($"Delivered '{deliveryResult.Value}' to '{deliveryResult.TopicPartitionOffset}'");
            }
        }


        // Consumer
        int receiveCount = 0;
        string consumerGroupId = "cg1";
        var consumerConfig = new ConsumerConfig
        {
            BootstrapServers = kafkaBootstrapServers,
            SecurityProtocol = SecurityProtocol.SaslPlaintext,
            SaslMechanism = SaslMechanism.Plain,
            SaslUsername = "$ConnectionString",
            SaslPassword = eventHubsConnectionString,
            GroupId = consumerGroupId,
            EnableAutoCommit = true,
            AutoOffsetReset = AutoOffsetReset.Earliest
        };
        using (var consumer = new ConsumerBuilder<Null, string>(consumerConfig).Build())
        {
            consumer.Subscribe(eventHubName);

            CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromSeconds(15));
            Console.CancelKeyPress += (_, e) => {
                e.Cancel = true;
                cts.Cancel();
            };

            try
            {
                while (receiveCount < produceCount)
                {
                    var cr = consumer.Consume(cts.Token);
                    Console.WriteLine($"Consumed message '{cr.Message.Value}' from: '{cr.TopicPartitionOffset}'.");
                    receiveCount += 1;
                }
            }
            catch (OperationCanceledException)
            {
                Console.WriteLine("Cancelled!");
            }

            consumer.Close();
        }

        Console.WriteLine($"Produced: {produceCount} messages");
        Console.WriteLine($"Consumed: {receiveCount} messages");
    }