edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/configsources/FileConfigSource.cs (113 lines of code) (raw):

// Copyright (c) Microsoft. All rights reserved. namespace Microsoft.Azure.Devices.Edge.Agent.Core.ConfigSources { using System; using System.IO; using System.Reactive; using System.Reactive.Linq; using System.Threading.Tasks; using Microsoft.Azure.Devices.Edge.Agent.Core.Serde; using Microsoft.Azure.Devices.Edge.Util; using Microsoft.Azure.Devices.Edge.Util.Concurrency; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; public class FileConfigSource : IConfigSource { const double FileChangeWatcherDebounceInterval = 500; readonly FileSystemWatcher watcher; readonly string configFilePath; readonly IDisposable watcherSubscription; readonly AtomicReference<DeploymentConfigInfo> current; readonly AsyncLock sync; readonly ISerde<DeploymentConfigInfo> serde; FileConfigSource(FileSystemWatcher watcher, DeploymentConfigInfo initial, IConfiguration configuration, ISerde<DeploymentConfigInfo> serde) { this.watcher = Preconditions.CheckNotNull(watcher, nameof(watcher)); this.Configuration = Preconditions.CheckNotNull(configuration, nameof(configuration)); this.current = new AtomicReference<DeploymentConfigInfo>(Preconditions.CheckNotNull(initial, nameof(initial))); this.serde = Preconditions.CheckNotNull(serde, nameof(serde)); this.configFilePath = Path.Combine(this.watcher.Path, this.watcher.Filter); this.sync = new AsyncLock(); this.watcherSubscription = Observable .FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>( h => this.watcher.Changed += h, h => this.watcher.Changed -= h) // Rx.NET's "Throttle" is really "Debounce". An unfortunate naming mishap. .Throttle(TimeSpan.FromMilliseconds(FileChangeWatcherDebounceInterval)) .Subscribe(this.WatcherOnChanged); this.watcher.EnableRaisingEvents = true; Events.Created(this.configFilePath); } public IConfiguration Configuration { get; } public static async Task<FileConfigSource> Create(string configFilePath, IConfiguration configuration, ISerde<DeploymentConfigInfo> serde) { Preconditions.CheckNotNull(serde, nameof(serde)); string path = Preconditions.CheckNonWhiteSpace(Path.GetFullPath(configFilePath), nameof(configFilePath)); if (!File.Exists(path)) { throw new FileNotFoundException("Invalid config file path", path); } string directoryName = Path.GetDirectoryName(path); string fileName = Path.GetFileName(path); DeploymentConfigInfo initial = await ReadFromDisk(path, serde); var watcher = new FileSystemWatcher(directoryName, fileName) { NotifyFilter = NotifyFilters.LastWrite }; return new FileConfigSource(watcher, initial, configuration, serde); } public Task<DeploymentConfigInfo> GetDeploymentConfigInfoAsync() => Task.FromResult(this.current.Value); public void Dispose() { this.watcherSubscription.Dispose(); this.watcher.Dispose(); } static async Task<DeploymentConfigInfo> ReadFromDisk(string path, ISerde<DeploymentConfigInfo> serde) { string json = await DiskFile.ReadAllAsync(path); DeploymentConfigInfo deploymentConfig = serde.Deserialize(json); return deploymentConfig; } void UpdateCurrent(DeploymentConfigInfo updated) { DeploymentConfigInfo snapshot = this.current.Value; if (!this.current.CompareAndSet(snapshot, updated)) { throw new InvalidOperationException("Invalid update current moduleset operation."); } } async void WatcherOnChanged(EventPattern<FileSystemEventArgs> args) { if ((args.EventArgs.ChangeType & WatcherChangeTypes.Changed) != WatcherChangeTypes.Changed) return; try { using (await this.sync.LockAsync()) { DeploymentConfigInfo newConfig = await ReadFromDisk(this.configFilePath, this.serde); this.UpdateCurrent(newConfig); } } catch (Exception ex) when (!ex.IsFatal()) { Events.NewConfigurationFailed(ex, this.configFilePath); } } static class Events { const int IdStart = AgentEventIds.FileConfigSource; static readonly ILogger Log = Logger.Factory.CreateLogger<FileConfigSource>(); enum EventIds { Created = IdStart, NewConfigurationFailed } public static void Created(string filename) { Log.LogDebug((int)EventIds.Created, $"FileConfigSource created with filename {filename}"); } public static void NewConfigurationFailed(Exception exception, string filename) { Log.LogError((int)EventIds.NewConfigurationFailed, exception, $"FileConfigSource failed reading new configuration file, {filename}"); } } } }