src/WebJobs.Script.WebHost/Metrics/LinuxContainerLegionMetricsPublisher.cs (225 lines of code) (raw):
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Diagnostics;
using System.IO.Abstractions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Functions.Platform.Metrics.LinuxConsumption;
using Microsoft.Azure.WebJobs.Script.Config;
using Microsoft.Azure.WebJobs.Script.Diagnostics;
using Microsoft.Azure.WebJobs.Script.WebHost.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace Microsoft.Azure.WebJobs.Script.WebHost.Metrics
{
public sealed class LinuxContainerLegionMetricsPublisher : IMetricsPublisher, IDisposable
{
private readonly ILinuxConsumptionMetricsTracker _metricsTracker;
private readonly LegionMetricsFileManager _metricsFileManager;
private readonly TimeSpan _memorySnapshotInterval = TimeSpan.FromMilliseconds(1000);
private readonly TimeSpan _timerStartDelay = TimeSpan.FromSeconds(2);
private readonly IOptionsMonitor<StandbyOptions> _standbyOptions;
private readonly IDisposable _standbyOptionsOnChangeListener;
private readonly IDisposable _hostingConfigOptionsOnChangeListener;
private readonly IEnvironment _environment;
private readonly ILogger _logger;
private readonly IServiceProvider _serviceProvider;
private readonly string _containerName;
private readonly IOptionsMonitor<FunctionsHostingConfigOptions> _hostingConfigOptions;
private IMetricsLogger _metricsLogger;
private TimeSpan _metricPublishInterval;
private Process _process;
private Timer _processMonitorTimer;
private Timer _metricsPublisherTimer;
private bool _initialized = false;
private bool _isCGroupMemoryMetricsEnabled = false;
public LinuxContainerLegionMetricsPublisher(IEnvironment environment, IOptionsMonitor<StandbyOptions> standbyOptions,
IOptions<LinuxConsumptionLegionMetricsPublisherOptions> options, ILogger<LinuxContainerLegionMetricsPublisher> logger,
IFileSystem fileSystem, ILinuxConsumptionMetricsTracker metricsTracker, IServiceProvider serviceProvider,
IOptionsMonitor<FunctionsHostingConfigOptions> functionsHostingConfigOptions,
int? metricsPublishIntervalMS = null)
{
_standbyOptions = standbyOptions ?? throw new ArgumentNullException(nameof(standbyOptions));
_environment = environment ?? throw new ArgumentNullException(nameof(environment));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_metricsTracker = metricsTracker ?? throw new ArgumentNullException(nameof(metricsTracker));
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
_hostingConfigOptions = functionsHostingConfigOptions ?? throw new ArgumentNullException(nameof(functionsHostingConfigOptions));
_containerName = options.Value.ContainerName;
// Set this to 15 minutes worth of files
_metricPublishInterval = TimeSpan.FromMilliseconds(metricsPublishIntervalMS ?? options.Value.MetricsPublishIntervalMS);
int maxFileCount = 15 * (int)Math.Ceiling(1.0 * 60 / _metricPublishInterval.TotalSeconds);
_metricsFileManager = new LegionMetricsFileManager(options.Value.MetricsFilePath, fileSystem, logger, maxFileCount);
_processMonitorTimer = new Timer(OnProcessMonitorTimer, null, Timeout.Infinite, Timeout.Infinite);
_metricsPublisherTimer = new Timer(OnFunctionMetricsPublishTimer, null, Timeout.Infinite, Timeout.Infinite);
_metricsTracker.OnDiagnosticEvent += OnMetricsDiagnosticEvent;
if (_standbyOptions.CurrentValue.InStandbyMode)
{
_standbyOptionsOnChangeListener = _standbyOptions.OnChange(o => OnStandbyOptionsChange(o));
}
else
{
Start();
}
_hostingConfigOptionsOnChangeListener = _hostingConfigOptions.OnChange(OnHostingConfigOptionsChanged);
}
private IMetricsLogger MetricsLogger => _metricsLogger ??= _serviceProvider.GetRequiredService<IMetricsLogger>();
private void OnHostingConfigOptionsChanged(FunctionsHostingConfigOptions newOptions)
{
if (newOptions.IsCGroupMemoryMetricsEnabled != _isCGroupMemoryMetricsEnabled)
{
_logger.LogInformation("CGroup memory metrics enabled: {Enabled}", newOptions.IsCGroupMemoryMetricsEnabled);
_isCGroupMemoryMetricsEnabled = newOptions.IsCGroupMemoryMetricsEnabled;
}
}
public void Initialize()
{
_process = Process.GetCurrentProcess();
_initialized = true;
}
public void Start()
{
Initialize();
// start the timers by setting the due time
SetTimerInterval(_processMonitorTimer, _timerStartDelay);
SetTimerInterval(_metricsPublisherTimer, _metricPublishInterval);
_logger.LogInformation("Starting metrics publisher for container : {ContainerName}.", _containerName);
}
private void OnStandbyOptionsChange(StandbyOptions standbyOptions)
{
if (!standbyOptions.InStandbyMode)
{
Start();
}
}
public void AddFunctionExecutionActivity(string functionName, string invocationId, int concurrency, string executionStage, bool success, long executionTimeSpan, string executionId, DateTime eventTimeStamp, DateTime functionStartTime)
{
if (!_initialized)
{
return;
}
Enum.TryParse(executionStage, out FunctionExecutionStage functionExecutionStage);
FunctionActivity activity = new FunctionActivity
{
FunctionName = functionName,
InvocationId = invocationId,
Concurrency = concurrency,
ExecutionStage = functionExecutionStage,
ExecutionId = executionId,
IsSucceeded = success,
ExecutionTimeSpanInMs = executionTimeSpan,
EventTimeStamp = eventTimeStamp,
StartTime = functionStartTime
};
_metricsTracker.AddFunctionActivity(activity);
}
public void AddMemoryActivity(DateTime timeStampUtc, long data)
{
if (!_initialized)
{
return;
}
var memoryActivity = new MemoryActivity
{
CommitSizeInBytes = data,
EventTimeStamp = timeStampUtc
};
_metricsTracker.AddMemoryActivity(memoryActivity);
}
private async void OnFunctionMetricsPublishTimer(object state)
{
await OnPublishMetricsAsync();
}
internal async Task OnPublishMetricsAsync()
{
try
{
if (_metricsTracker.TryGetMetrics(out LinuxConsumptionMetrics trackedMetrics))
{
var metricsToPublish = new Metrics
{
FunctionActivity = trackedMetrics.FunctionActivity,
ExecutionCount = trackedMetrics.FunctionExecutionCount,
ExecutionTimeMS = trackedMetrics.FunctionExecutionTimeMS
};
await _metricsFileManager.PublishMetricsAsync(metricsToPublish);
}
}
catch (Exception ex) when (!ex.IsFatal())
{
// ensure no background exceptions escape
_logger.LogError(ex, $"Error publishing metrics.");
}
finally
{
SetTimerInterval(_metricsPublisherTimer, _metricPublishInterval);
}
}
private void OnProcessMonitorTimer(object state)
{
try
{
long memoryUsageInBytes;
if (_hostingConfigOptions.CurrentValue.IsCGroupMemoryMetricsEnabled)
{
memoryUsageInBytes = CgroupMemoryUsageHelper.GetMemoryUsageInBytes(_logger);
}
else
{
_process.Refresh();
memoryUsageInBytes = _process.WorkingSet64;
}
if (memoryUsageInBytes != 0)
{
AddMemoryActivity(DateTime.UtcNow, memoryUsageInBytes);
}
}
catch (Exception e)
{
// throwing this exception will mask other underlying exceptions.
// Log and let other interesting exceptions bubble up.
_logger.LogError(e, nameof(OnProcessMonitorTimer));
}
finally
{
SetTimerInterval(_processMonitorTimer, _memorySnapshotInterval);
}
}
private void SetTimerInterval(Timer timer, TimeSpan dueTime)
{
try
{
timer?.Change((int)dueTime.TotalMilliseconds, Timeout.Infinite);
}
catch (ObjectDisposedException)
{
// might race with dispose
}
catch (Exception e)
{
_logger.LogError(e, nameof(SetTimerInterval));
}
}
private void OnMetricsDiagnosticEvent(object sender, DiagnosticEventArgs e)
{
MetricsLogger.LogEvent(e.EventName);
}
public void OnFunctionStarted(string functionName, string invocationId)
{
// nothing to do
}
public void OnFunctionCompleted(string functionName, string invocationId)
{
// nothing to do
}
public void Dispose()
{
_processMonitorTimer?.Dispose();
_processMonitorTimer = null;
_metricsPublisherTimer?.Dispose();
_metricsPublisherTimer = null;
_metricsTracker.OnDiagnosticEvent -= OnMetricsDiagnosticEvent;
_standbyOptionsOnChangeListener?.Dispose();
_hostingConfigOptionsOnChangeListener?.Dispose();
}
internal class Metrics
{
/// <summary>
/// Gets or sets a measure of the function activity for the interval.
/// </summary>
public long FunctionActivity { get; set; }
/// <summary>
/// Gets or sets the total execution duration for all functions during this interval.
/// </summary>
public long ExecutionTimeMS { get; set; }
/// <summary>
/// Gets or sets the total number of functions invocations that
/// completed during the interval.
/// </summary>
public long ExecutionCount { get; set; }
}
}
}