src/Elastic.Extensions.Logging/ElasticsearchLoggerProvider.cs (175 lines of code) (raw):

// Licensed to Elasticsearch B.V under one or more agreements. // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Threading; using System.Threading.Tasks; using Elastic.Channels; using Elastic.Channels.Diagnostics; using Elastic.Extensions.Logging.Options; using Elastic.Ingest.Elasticsearch; using Elastic.Ingest.Elasticsearch.CommonSchema; using Elastic.Ingest.Elasticsearch.DataStreams; using Elastic.Ingest.Elasticsearch.Indices; using Elastic.Transport; using Elastic.Transport.Products.Elasticsearch; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; namespace Elastic.Extensions.Logging { /// <summary> /// An <see cref="ILoggerProvider"/> implementation that exposes a way to create <see cref="ElasticsearchLogger"/> /// instances to <see cref="LoggerFactory"/> /// </summary> [ProviderAlias("Elasticsearch")] public class ElasticsearchLoggerProvider : ILoggerProvider, ISupportExternalScope, IChannelProvider { private readonly IChannelSetup[] _channelConfigurations; private readonly IOptionsMonitor<ElasticsearchLoggerOptions> _options; private readonly IDisposable? _optionsReloadToken; private IExternalScopeProvider? _scopeProvider; private IBufferedChannel<LogEvent> _shipper; private static readonly LogEventWriter LogEventWriterInstance = new() { WriteToStreamAsync = static async (stream, logEvent, ctx) => await logEvent.SerializeAsync(stream, ctx).ConfigureAwait(false), #if NETSTANDARD2_1_OR_GREATER || NET8_0_OR_GREATER WriteToArrayBuffer = static (arrayBufferWriter, logEvent) => { var serialized = logEvent.SerializeToUtf8Bytes(); // TODO - Performance optimisation to avoid array allocation var span = arrayBufferWriter.GetSpan(serialized.Length); serialized.AsSpan().CopyTo(span); arrayBufferWriter.Advance(serialized.Length); } #endif }; /// <inheritdoc cref="IChannelDiagnosticsListener"/> public IChannelDiagnosticsListener? DiagnosticsListener { get; } /// <inheritdoc cref="ElasticsearchLoggerProvider"/> public ElasticsearchLoggerProvider(IOptionsMonitor<ElasticsearchLoggerOptions> options, IEnumerable<IChannelSetup> channelConfigurations ) { _options = options ?? throw new ArgumentNullException(nameof(options)); if (channelConfigurations is null) throw new ArgumentNullException(nameof(channelConfigurations)); _channelConfigurations = channelConfigurations.ToArray(); _shipper = CreatIngestChannel(options.CurrentValue); _optionsReloadToken = _options.OnChange(o => ReloadShipper(o)); DiagnosticsListener = _shipper.DiagnosticsListener; } // ReSharper disable once AutoPropertyCanBeMadeGetOnly.Global /// <summary> Returns <see cref="DateTimeOffset.UtcNow"/></summary> public static Func<DateTimeOffset> LocalDateTimeProvider { get; set; } = () => DateTimeOffset.UtcNow; /// <inheritdoc cref="ILoggerProvider.CreateLogger"/> public ILogger CreateLogger(string name) => new ElasticsearchLogger(name, this, _options.CurrentValue, _scopeProvider); /// <inheritdoc cref="IDisposable.Dispose"/> public void Dispose() { _optionsReloadToken?.Dispose(); _shipper.Dispose(); } /// <inheritdoc cref="ISupportExternalScope.SetScopeProvider"/> public void SetScopeProvider(IExternalScopeProvider scopeProvider) => _scopeProvider = scopeProvider; private static void SetupChannelOptions( IChannelSetup[] channelConfigurations, ElasticsearchChannelOptionsBase<LogEvent> channelOptions ) { foreach (var channelSetup in channelConfigurations) channelSetup.ConfigureChannel(channelOptions); } private static NodePool CreateNodePool(ElasticsearchLoggerOptions loggerOptions) { var shipTo = loggerOptions.ShipTo; var connectionPool = loggerOptions.ShipTo.NodePoolType; var nodeUris = loggerOptions.ShipTo.NodeUris?.ToArray() ?? Array.Empty<Uri>(); if (nodeUris.Length == 0 && connectionPool != NodePoolType.Cloud) return new SingleNodePool(new Uri("http://localhost:9200")); if (connectionPool == NodePoolType.SingleNode || connectionPool == NodePoolType.Unknown && nodeUris.Length == 1) return new SingleNodePool(nodeUris[0]); switch (connectionPool) { // TODO: Add option to randomize pool case NodePoolType.Unknown: case NodePoolType.Sniffing: return new SniffingNodePool(nodeUris); case NodePoolType.Static: return new StaticNodePool(nodeUris); case NodePoolType.Sticky: return new StickyNodePool(nodeUris); // case NodePoolType.StickySniffing: case NodePoolType.Cloud: if (shipTo.CloudId.IsNullOrEmpty()) throw new Exception($"Cloud {nameof(CloudNodePool)} requires '{nameof(ShipToOptions.CloudId)}' to be provided as well"); if (!shipTo.ApiKey.IsNullOrEmpty()) { var apiKeyCredentials = new ApiKey(shipTo.ApiKey); return new CloudNodePool(shipTo.CloudId, apiKeyCredentials); } if (!shipTo.Username.IsNullOrEmpty() && !shipTo.Password.IsNullOrEmpty()) { var basicAuthCredentials = new BasicAuthentication(shipTo.Username, shipTo.Password); return new CloudNodePool(shipTo.CloudId, basicAuthCredentials); } throw new Exception( $"Cloud requires either '{nameof(ShipToOptions.ApiKey)}' or" + $"'{nameof(ShipToOptions.Username)}' and '{nameof(ShipToOptions.Password)}" ); default: throw new ArgumentException($"Unrecognised connection pool type '{connectionPool}' specified in the configuration.", nameof(connectionPool)); } } private static ITransport CreateTransport(ElasticsearchLoggerOptions loggerOptions) { // TODO: Check if Uri has changed before recreating // TODO: Injectable factory? Or some way of testing. if (loggerOptions.Transport != null) return loggerOptions.Transport; var connectionPool = CreateNodePool(loggerOptions); var config = new TransportConfigurationDescriptor(connectionPool, productRegistration: ElasticsearchProductRegistration.Default); // Cloud sets authentication as required parameter in the constructor if (loggerOptions.ShipTo.NodePoolType != NodePoolType.Cloud) config = SetAuthenticationOnTransport(loggerOptions, config); var transport = new DistributedTransport<ITransportConfiguration>(config); return transport; } private static TransportConfigurationDescriptor SetAuthenticationOnTransport(ElasticsearchLoggerOptions loggerOptions, TransportConfigurationDescriptor config) { var apiKey = loggerOptions.ShipTo.ApiKey; var username = loggerOptions.ShipTo.Username; var password = loggerOptions.ShipTo.Password; if (!username.IsNullOrEmpty() && !password.IsNullOrEmpty()) config = config.Authentication(new BasicAuthentication(username, password)); else if (!apiKey.IsNullOrEmpty()) config = config.Authentication(new ApiKey(apiKey)); return config; } private void ReloadShipper(ElasticsearchLoggerOptions loggerOptions) { var newShipper = CreatIngestChannel(loggerOptions); var oldShipper = Interlocked.Exchange(ref _shipper, newShipper); oldShipper?.Dispose(); } private IBufferedChannel<LogEvent> CreatIngestChannel(ElasticsearchLoggerOptions loggerOptions) { var transport = CreateTransport(loggerOptions); if (loggerOptions.Index != null) { var indexChannelOptions = new IndexChannelOptions<LogEvent>(transport) { IndexFormat = loggerOptions.Index.Format, IndexOffset = loggerOptions.Index.IndexOffset, TimestampLookup = l => l.Timestamp, }; SetupChannelOptions(_channelConfigurations, indexChannelOptions); return new EcsIndexChannel<LogEvent>(indexChannelOptions); } else { var dataStreamNameOptions = loggerOptions.DataStream ?? new DataStreamNameOptions(); var indexChannelOptions = new DataStreamChannelOptions<LogEvent>(transport) { DataStream = new DataStreamName(dataStreamNameOptions.Type, dataStreamNameOptions.DataSet, dataStreamNameOptions.Namespace), EventWriter = LogEventWriterInstance }; SetupChannelOptions(_channelConfigurations, indexChannelOptions); var channel = new EcsDataStreamChannel<LogEvent>(indexChannelOptions); channel.BootstrapElasticsearch(loggerOptions.BootstrapMethod, loggerOptions.IlmPolicy); return channel; } } /// <inheritdoc cref="IChannelProvider.GetChannel"/> public IBufferedChannel<LogEvent> GetChannel() => _shipper; private sealed class LogEventWriter : IElasticsearchEventWriter<LogEvent> { #if NETSTANDARD2_1_OR_GREATER || NET8_0_OR_GREATER public Action<System.Buffers.ArrayBufferWriter<byte>, LogEvent>? WriteToArrayBuffer { get; set; } #endif public Func<Stream, LogEvent, CancellationToken, Task>? WriteToStreamAsync { get; set; } } } }