in Amazon.KinesisTap.Windows/WindowsEventPollingSource.cs [156:227]
private async Task ReaderTask(CancellationToken stopToken)
{
var delay = _options.MinReaderDelayMs;
EventRecord entry = null;
long? prevRecordId = null;
try
{
await InitializeBookmarkLocation(stopToken);
}
catch (OperationCanceledException) when (stopToken.IsCancellationRequested)
{
return;
}
await Task.Yield();
_logger.LogInformation("{0} Id {1} started reading", nameof(WindowsEventPollingSource), Id);
while (!stopToken.IsCancellationRequested)
{
var batchCount = 0;
try
{
// setting batch size is pretty important for performance, as the .NET wrapper will attempt to query events in batches
// see https://referencesource.microsoft.com/#system.core/System/Diagnostics/Eventing/Reader/EventLogReader.cs,149
using (var eventLogReader = await CreateEventLogReader(_eventBookmark, stopToken))
{
while ((entry = DoRead(eventLogReader)) != null)
{
batchCount++;
#if DEBUG
Interlocked.Increment(ref _total);
#endif
var newEventBookmark = entry.Bookmark;
if (!OnEventRecord(entry, prevRecordId, stopToken))
{
await EnsureDependencyAvailable(stopToken);
break;
}
_eventBookmark = newEventBookmark;
prevRecordId = entry.RecordId;
if (batchCount == BatchSize)
{
_logger.LogDebug($"Read max possible number of events: {BatchSize}");
break;
}
stopToken.ThrowIfCancellationRequested();
}
if (entry is null)
{
_logger?.LogDebug("Read {0} events", batchCount);
}
}
delay = GetNextReaderDelayMs(delay, batchCount, _logger);
batchCount = 0;
_logger.LogDebug("Delaying {0}ms", delay);
await Task.Delay(delay, stopToken);
}
catch (OperationCanceledException) when (stopToken.IsCancellationRequested)
{
_logger.LogInformation("Event Reader stopped");
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error reading events");
}
}
_logger.LogInformation("{0} Id {1} stopped reading", nameof(WindowsEventPollingSource), Id);
}