in Amazon.KinesisTap.Windows/EventLogSource.cs [91:175]
private async Task Execution(CancellationToken stopToken)
{
try
{
await InitializeBookmarkLocation(stopToken);
}
catch (OperationCanceledException) when (stopToken.IsCancellationRequested)
{
return;
}
while (!stopToken.IsCancellationRequested)
{
try
{
if (_watcher is not null)
{
_watcher.EventRecordWritten -= OnEventRecordWritten;
_watcher.Dispose();
_logger.LogInformation("Resetting event log watcher");
}
var readExistingEvent = _logStartedAfterSource ||
InitialPosition == InitialPositionEnum.BOS ||
InitialPosition == InitialPositionEnum.Timestamp ||
_eventBookmark != null;
_watcher = await CreateWatcher(readExistingEvent, stopToken);
_watcher.EventRecordWritten += OnEventRecordWritten;
// set the 'watcherStarting' flag so the callback can cancel
Interlocked.Exchange(ref _watcherStarting, 1);
// yield thread execution before enabling watcher, since this operation can be long running
await Task.Yield();
_watcher.Enabled = true;
Interlocked.Exchange(ref _watcherStarting, 0);
}
catch (OperationCanceledException)
{
if (stopToken.IsCancellationRequested)
{
if (_watcher is not null)
{
_watcher.Enabled = false;
_watcher.EventRecordWritten -= OnEventRecordWritten;
_watcher.Dispose();
}
break;
}
continue;
}
catch (EventLogException ele) when (ele.Message.Contains("handle is invalid"))
{
_logger.LogWarning(ele, "Error while reading log {0}", _logName);
continue;
}
try
{
await _eventExceptionChannel.Reader.WaitToReadAsync(stopToken);
var reset = false;
while (_eventExceptionChannel.Reader.TryRead(out var ex))
{
reset = true;
}
if (reset)
{
continue;
}
}
catch (OperationCanceledException) when (stopToken.IsCancellationRequested)
{
if (_watcher is not null)
{
_watcher.Enabled = false;
_watcher.EventRecordWritten -= OnEventRecordWritten;
_watcher.Dispose();
}
break;
}
}
}