src/Elastic.Serilog.Sinks/ElasticsearchSink.cs (113 lines of code) (raw):

using System; using System.Collections.Generic; using System.Linq; using Elastic.Channels.Buffers; using Elastic.Channels.Diagnostics; using Elastic.CommonSchema; using Elastic.CommonSchema.Serilog; using Elastic.Ingest.Elasticsearch; using Elastic.Ingest.Elasticsearch.CommonSchema; using Elastic.Ingest.Elasticsearch.DataStreams; using Elastic.Ingest.Elasticsearch.Serialization; using Elastic.Transport; using Elastic.Transport.Products.Elasticsearch; using Serilog.Core; using Serilog.Debugging; using Serilog.Events; namespace Elastic.Serilog.Sinks { /// <summary> /// A read only view of the options provided to <see cref="ElasticsearchSink"/> /// </summary> public interface IElasticsearchSinkOptions { /// <inheritdoc cref="BootstrapMethod"/> BootstrapMethod BootstrapMethod { get; } /// <inheritdoc cref="IEcsTextFormatterConfiguration"/> IEcsTextFormatterConfiguration EcsTextFormatterConfiguration { get; } /// <inheritdoc cref="DataStreamName"/> public DataStreamName DataStream { get; } /// <summary> /// The ILM Policy to apply, see the following for more details: /// <para>https://www.elastic.co/guide/en/elasticsearch/reference/current/index-lifecycle-management.html</para> /// Defaults to `logs` which is shipped by default with Elasticsearch /// </summary> public string? IlmPolicy { get; } } /// <summary> /// Provides configuration options to <see cref="ElasticsearchSink"/> to control how and where data gets written /// </summary> public class ElasticsearchSinkOptions : ElasticsearchSinkOptions<EcsDocument> { /// <inheritdoc cref="ElasticsearchSinkOptions"/> public ElasticsearchSinkOptions() { } /// <inheritdoc cref="ElasticsearchSinkOptions"/> public ElasticsearchSinkOptions(ITransport transport) : base(transport) { } } /// <inheritdoc cref="ElasticsearchSinkOptions{TEcsDocument}"/> public class ElasticsearchSinkOptions<TEcsDocument> : IElasticsearchSinkOptions where TEcsDocument : EcsDocument, new() { /// <inheritdoc cref="ElasticsearchSinkOptions"/> public ElasticsearchSinkOptions() : this(new DistributedTransport(TransportHelper.Default())) { } /// <inheritdoc cref="ElasticsearchSinkOptions"/> public ElasticsearchSinkOptions(ITransport transport) => Transport = transport; /// <inheritdoc cref="ITransport{TConfiguration}"/> internal ITransport Transport { get; } IEcsTextFormatterConfiguration IElasticsearchSinkOptions.EcsTextFormatterConfiguration => TextFormatting; /// <inheritdoc cref="EcsTextFormatterConfiguration{TEcsDocument}"/> public EcsTextFormatterConfiguration<TEcsDocument> TextFormatting { get; set; } = new(); /// <inheritdoc cref="DataStreamName"/> public DataStreamName DataStream { get; set; } = new("logs", "dotnet"); /// <summary> /// Allows you to configure the <see cref="EcsDataStreamChannel{TEcsDocument}"/> used by the sink to send data to Elasticsearch /// </summary> public Action<DataStreamChannelOptions<TEcsDocument>>? ConfigureChannel { get; set; } /// <summary> /// Allows programmatic access to active channel diagnostics listener when its created. /// </summary> public Action<IChannelDiagnosticsListener>? ChannelDiagnosticsCallback { get; set; } /// <inheritdoc cref="BootstrapMethod"/> public BootstrapMethod BootstrapMethod { get; set; } /// <inheritdoc cref="IElasticsearchSinkOptions.IlmPolicy"/> public string? IlmPolicy { get; set; } /// <summary> /// Provide an explicit minimum <see cref="LogEventLevel"/> for the Elasticsearch sink. /// <para>This allows you to separately configure the sink to filter out messages.</para> /// <para>E.g: Configure default logging at <see cref="LogEventLevel.Verbose"/> but only write <see cref="LogEventLevel.Error"/> /// to Elasticsearch</para> /// </summary> public LogEventLevel? MinimumLevel { get; set; } /// <summary> /// A switch allowing the pass-through minimum level to be changed at runtime. /// <para>Takes precedence over <see cref="MinimumLevel"/> if both are configured</para> /// </summary> public LoggingLevelSwitch? LevelSwitch { get; set; } } /// <summary> /// This sink allows you to write serilog logs directly to Elasticsearch or Elastic Cloud /// </summary> public class ElasticsearchSink : ElasticsearchSink<EcsDocument> { /// <inheritdoc cref="ElasticsearchSink"/>> public ElasticsearchSink(ElasticsearchSinkOptions options) : base(options) {} } /// <inheritdoc cref="ElasticsearchSink"/>> public class ElasticsearchSink<TEcsDocument> : ILogEventSink, IDisposable where TEcsDocument : EcsDocument, new() { private readonly EcsTextFormatterConfiguration<TEcsDocument> _formatterConfiguration; private readonly EcsDataStreamChannel<TEcsDocument> _channel; /// <inheritdoc cref="IElasticsearchSinkOptions"/> public IElasticsearchSinkOptions Options { get; } /// <inheritdoc cref="ElasticsearchSink"/>> public ElasticsearchSink(ElasticsearchSinkOptions<TEcsDocument> options) { Options = options; _formatterConfiguration = options.TextFormatting; var channelOptions = new DataStreamChannelOptions<TEcsDocument>(options.Transport) { DataStream = options.DataStream }; options.ConfigureChannel?.Invoke(channelOptions); _channel = new EcsDataStreamChannel<TEcsDocument>(channelOptions, new [] { new SelfLogCallbackListener<TEcsDocument>(options)}); if (_channel.DiagnosticsListener != null) options.ChannelDiagnosticsCallback?.Invoke(_channel.DiagnosticsListener); _channel.BootstrapElasticsearch(options.BootstrapMethod, options.IlmPolicy); } /// <inheritdoc cref="ILogEventSink.Emit"/> public void Emit(LogEvent logEvent) { var ecsDoc = LogEventConverter.ConvertToEcs(logEvent, _formatterConfiguration); _channel.TryWrite(ecsDoc); } /// <summary> Disposes and flushed <see cref="EcsDataStreamChannel{TEcsDocument}"/> </summary> public void Dispose() => _channel.Dispose(); } internal class SelfLogCallbackListener<TEcsDocument> : IChannelCallbacks<TEcsDocument, BulkResponse> where TEcsDocument : EcsDocument, new() { public Action<Exception>? ExportExceptionCallback { get; } public Action<BulkResponse, IWriteTrackingBuffer>? ExportResponseCallback { get; } // ReSharper disable UnassignedGetOnlyAutoProperty public Action<int, int>? ExportItemsAttemptCallback { get; } public Action<IReadOnlyCollection<TEcsDocument>>? ExportMaxRetriesCallback { get; } public Action<IReadOnlyCollection<TEcsDocument>>? ExportRetryCallback { get; } public Action? PublishToInboundChannelCallback { get; } public Action? PublishToInboundChannelFailureCallback { get; } public Action? PublishToOutboundChannelCallback { get; } public Action? OutboundChannelStartedCallback { get; } public Action? OutboundChannelExitedCallback { get; } public Action? InboundChannelStartedCallback { get; } public Action? PublishToOutboundChannelFailureCallback { get; } public Action? ExportBufferCallback { get; } public Action<int>? ExportRetryableCountCallback { get; } // ReSharper enable UnassignedGetOnlyAutoProperty public SelfLogCallbackListener(ElasticsearchSinkOptions<TEcsDocument> options) { ExportExceptionCallback = e => { SelfLog.WriteLine("Observed an exception while writing to {0}", options.DataStream); SelfLog.WriteLine("{0}", e); }; ExportResponseCallback = (response, _) => { if (response == null) return; if (response.TryGetElasticsearchServerError(out var error)) SelfLog.WriteLine("{0}", error); // ReSharper disable once ConditionalAccessQualifierIsNonNullableAccordingToAPIContract var errorItems = response.Items?.Where(i => i.Status >= 300).ToArray() ?? Array.Empty<BulkResponseItem>(); foreach (var errorItem in errorItems) SelfLog.WriteLine("{0}", $"Failed to {errorItem.Action} document status: ${errorItem.Status}, error: ${errorItem.Error}"); }; } } }