static async Task Main()

in DataGenerators/TwitterClientCore/Program.cs [24:76]


        static async Task Main(string[] args)
        {
            //Configure Twitter OAuth
            var oauthToken = ConfigurationManager.AppSettings["oauth_token"];
            var oauthTokenSecret = ConfigurationManager.AppSettings["oauth_token_secret"];
            var oauthCustomerKey = ConfigurationManager.AppSettings["oauth_consumer_key"];
            var oauthConsumerSecret = ConfigurationManager.AppSettings["oauth_consumer_secret"];
            var keywords = ConfigurationManager.AppSettings["twitter_keywords"];

            var producer = new EventHubProducerClient(
                ConfigurationManager.AppSettings["EventHubConnectionString"],
                ConfigurationManager.AppSettings["EventHubName"],
                new EventHubProducerClientOptions() {
                }
            );

            Console.WriteLine($"Sending data eventhub : {producer.EventHubName} PartitionCount = {(await producer.GetPartitionIdsAsync()).Count()}");
            
            IObservable<string> twitterStream = TwitterStream.StreamStatuses(
                new TwitterConfig(
                    oauthToken, 
                    oauthTokenSecret, 
                    oauthCustomerKey, 
                    oauthConsumerSecret,
                    keywords))
                    .ToObservable();

            int maxMessageSizeInBytes = 250 * 1024;
            int maxSecondsToBuffer = 20;

            IObservable<EventData> eventDataObserver = Observable.Create<EventData>(
                outputObserver => twitterStream.Subscribe(
                    new EventDataGenerator(outputObserver, maxMessageSizeInBytes, maxSecondsToBuffer)));
            
            // keep upto 5 ongoing requests.
            int maxRequestsInProgress = 5;
            IObservable<Task> sendTasks = eventDataObserver
            .Select(e => 
            {
                var batch = producer.CreateBatchAsync().Result;
                if(!batch.TryAdd(e))
                {
                    throw new ArgumentOutOfRangeException("Content too big to send in a single eventhub message");
                }
                return producer.SendAsync(batch);
            })
            .Buffer(TimeSpan.FromMinutes(1), maxRequestsInProgress)
            .Select(sendTaskList => Task.WhenAll(sendTaskList));
            
            var subscription = sendTasks.Subscribe(
                sendEventDatasTask => sendEventDatasTask.Wait(),
                e => Console.WriteLine(e));
        }