Source/NuGetGallery.Worker/JobRunner.cs (135 lines of code) (raw):

using System; using System.Collections.Generic; using System.ComponentModel.Composition; using System.Linq; using System.Reactive; using System.Reactive.Linq; using System.Reactive.Subjects; using NLog; using NuGetGallery.Worker.Jobs; namespace NuGetGallery.Worker { [Export] public class JobRunner { private AsyncSubject<Unit> _subject = new AsyncSubject<Unit>(); private Logger _logger = LogManager.GetLogger("JobRunner"); private Settings _settings; public IDictionary<string, WorkerJob> Jobs { get; private set; } [ImportingConstructor] public JobRunner([Import(AllowDefault = true)] Settings settings, [ImportMany] IEnumerable<WorkerJob> jobs) { _settings = settings ?? new Settings(); Jobs = jobs.ToDictionary(j => j.GetType().Name, StringComparer.OrdinalIgnoreCase); _subject.OnNext(Unit.Default); foreach (var job in Jobs.Values) { try { job.Initialize(_settings); } catch (Exception ex) { _logger.ErrorException(String.Format("{2} Initializing '{0}': {1}", job.GetType().Name, ex.Message, ex.GetType().Name), ex); } } } public void Stop() { _subject.OnCompleted(); _logger.Info("Stopped Job Runner"); } public void Run() { Run(Jobs); } void Run(IDictionary<string, WorkerJob> jobs) { _logger.Info("Scheduling Jobs..."); // Set up the schedules IDisposable[] tokens; try { tokens = jobs.Select(job => { var startTime = DateTimeOffset.UtcNow + job.Value.Offset; _logger.Debug("Scheduling '{0}' to run every '{1}' starting at '{2}'.", job.Value.GetType().Name, job.Value.Period, startTime.ToLocalTime().ToString("yyyy-MM-dd HH:mm:ss")); return Observable.Timer(startTime, job.Value.Period) .Subscribe(_ => RunJob(job.Key, job.Value)); }).ToArray(); } catch (Exception ex) { _logger.ErrorException(String.Format("Error scheduling jobs: {0}", ex.Message), ex); return; } // Wait for a completion message _logger.Info("Ready at {0}. Waiting for Shutdown", DateTime.Now); // Wait for the system to shut down, but perform a heartbeat every minute. using (Observable.Interval(TimeSpan.FromMinutes(1)).Subscribe(t => _logger.Info("Heartbeat tick. Host is still running."))) { _subject.Wait(); } _logger.Info("Shutting down jobs..."); foreach (var token in tokens) { token.Dispose(); } } public void RunSingleJob(string name) { _logger.Info("Running " + name); WorkerJob job; if (!Jobs.TryGetValue(name, out job)) { _logger.Error("No such job: " + name); throw new InvalidOperationException("No such job: " + name); } RunJob(name, job); } public void RunSingleJobContinuously(string name) { WorkerJob job; if (!Jobs.TryGetValue(name, out job)) { _logger.Error("No such job: " + name); throw new InvalidOperationException("No such job: " + name); } Run(new Dictionary<string, WorkerJob> { { name, job } }); } public void OnStop() { foreach (var job in Jobs) { _logger.Debug("Cleaning up Job '{0}'", job.Key); } } public bool OnStart() { foreach (var job in Jobs) { _logger.Debug("Initializing Job '{0}'", job.Key); } return true; } private void RunJob(string name, WorkerJob job) { try { _logger.Debug("Executing Job '{0}'", name); DateTime before = DateTime.UtcNow; try { job.RunOnce(); DateTime after = DateTime.UtcNow; } catch (Exception ex) { DateTime after = DateTime.UtcNow; _logger.ErrorException(String.Format("Error Executing Job '{0}': {1}", name, ex.Message), ex); } } catch (Exception ex) { _logger.ErrorException(String.Format("Infrastructure Error Executing Job '{0}': {1}", name, ex.Message), ex); } } } }