in Amazon.KinesisTap.Hosting/SessionManager.cs [164:233]
private async Task ConfigChangePoller(CancellationToken stopToken)
{
var extraConfigs = new HashSet<string>();
while (!stopToken.IsCancellationRequested)
{
try
{
var shouldPoll = Interlocked.Exchange(ref _configChanged, 0) > 0;
if (shouldPoll)
{
// poll the default session first
await PollConfig(_defaultConfigFilePath, true, stopToken);
// get the set of extra config files
extraConfigs.Clear();
if (!Directory.Exists(_extraConfigDirPath))
{
continue;
}
var searchExtraConfigs = Directory.EnumerateFiles(_extraConfigDirPath, "*.json", SearchOption.TopDirectoryOnly);
foreach (var f in searchExtraConfigs)
{
if (ShouldIgnoreConfigFile(f))
{
continue;
}
extraConfigs.Add(f);
}
// List the sessions that are currently running but are no longer present in the list of configuration files.
// Exclude configurations that have been loaded from another location, since these are managed by an external component.
var sessionsToStop = ConfigPathToSessionMap
.Where(kv => kv.Key != _defaultConfigFilePath
&& !extraConfigs.Contains(kv.Key)
&& kv.Key.StartsWith(_extraConfigDirPath))
.Select(kv => kv.Key)
.ToList(); // Force the enumerable into a List.
foreach (var configPath in sessionsToStop)
{
stopToken.ThrowIfCancellationRequested();
if (ConfigPathToSessionMap.TryRemove(configPath, out var removedSession))
{
// always stop the session AFTER it's been removed from the mapping
await TerminateSession(removedSession, default);
}
}
// poll and start sessions as neccessary
foreach (var configFile in extraConfigs)
{
stopToken.ThrowIfCancellationRequested();
await PollConfig(configFile, false, stopToken);
}
}
await Task.Delay(ConfigChangePollingIntervalMs, stopToken);
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error watching config file changes.");
}
}
}