private async Task ReaderTask()

in Amazon.KinesisTap.Windows/WindowsEventPollingSource.cs [156:227]


        private async Task ReaderTask(CancellationToken stopToken)
        {
            var delay = _options.MinReaderDelayMs;
            EventRecord entry = null;
            long? prevRecordId = null;

            try
            {
                await InitializeBookmarkLocation(stopToken);
            }
            catch (OperationCanceledException) when (stopToken.IsCancellationRequested)
            {
                return;
            }
            await Task.Yield();

            _logger.LogInformation("{0} Id {1} started reading", nameof(WindowsEventPollingSource), Id);
            while (!stopToken.IsCancellationRequested)
            {
                var batchCount = 0;
                try
                {
                    // setting batch size is pretty important for performance, as the .NET wrapper will attempt to query events in batches
                    // see https://referencesource.microsoft.com/#system.core/System/Diagnostics/Eventing/Reader/EventLogReader.cs,149
                    using (var eventLogReader = await CreateEventLogReader(_eventBookmark, stopToken))
                    {
                        while ((entry = DoRead(eventLogReader)) != null)
                        {
                            batchCount++;
#if DEBUG
                            Interlocked.Increment(ref _total);
#endif
                            var newEventBookmark = entry.Bookmark;
                            if (!OnEventRecord(entry, prevRecordId, stopToken))
                            {
                                await EnsureDependencyAvailable(stopToken);
                                break;
                            }

                            _eventBookmark = newEventBookmark;
                            prevRecordId = entry.RecordId;

                            if (batchCount == BatchSize)
                            {
                                _logger.LogDebug($"Read max possible number of events: {BatchSize}");
                                break;
                            }
                            stopToken.ThrowIfCancellationRequested();
                        }
                        if (entry is null)
                        {
                            _logger?.LogDebug("Read {0} events", batchCount);
                        }
                    }
                    delay = GetNextReaderDelayMs(delay, batchCount, _logger);
                    batchCount = 0;
                    _logger.LogDebug("Delaying {0}ms", delay);
                    await Task.Delay(delay, stopToken);
                }
                catch (OperationCanceledException) when (stopToken.IsCancellationRequested)
                {
                    _logger.LogInformation("Event Reader stopped");
                    break;
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Error reading events");
                }
            }

            _logger.LogInformation("{0} Id {1} stopped reading", nameof(WindowsEventPollingSource), Id);
        }