src/Elastic.Channels/Diagnostics/DiagnosticsBufferedChannel.cs (31 lines of code) (raw):
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Elastic.Channels.Diagnostics;
/// <summary>
/// A NOOP implementation of <see cref="IBufferedChannel{TEvent}"/> that:
/// <para> - tracks the number of times <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.ExportAsync"/> is invoked under <see cref="NoopBufferedChannel.ExportedBuffers"/> </para>
/// <para> - observes the maximum concurrent calls to <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.ExportAsync"/> under <see cref="NoopBufferedChannel.ObservedConcurrency"/> </para>
/// <para> - tracks how often the buffer does not match the export size or the export buffers segment does not start at the expected offset </para>
/// </summary>
public class DiagnosticsBufferedChannel : NoopBufferedChannel
{
/// <inheritdoc cref="DiagnosticsBufferedChannel"/>
public DiagnosticsBufferedChannel(BufferOptions options, bool observeConcurrency = false, string? name = null)
: base(options, new [] { new ChannelDiagnosticsListener<NoopEvent, NoopResponse>(name ?? nameof(DiagnosticsBufferedChannel)) }, observeConcurrency)
{
}
/// <inheritdoc cref="DiagnosticsBufferedChannel"/>
public DiagnosticsBufferedChannel(NoopChannelOptions options, string? name = null)
: base(options, new [] { new ChannelDiagnosticsListener<NoopEvent, NoopResponse>(name ?? nameof(DiagnosticsBufferedChannel)) })
{
}
private long _bufferMismatches;
/// <summary> Keeps track of the number of times the buffer size or the buffer offset was off</summary>
public long BufferMismatches => _bufferMismatches;
/// <inheritdoc cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.ExportAsync"/>
protected override Task<NoopResponse> ExportAsync(ArraySegment<NoopEvent> buffer, CancellationToken ctx = default)
{
#if NETSTANDARD2_1
var b = buffer;
#else
IList<NoopEvent> b = buffer;
#endif
if (BatchExportSize != buffer.Count)
Interlocked.Increment(ref _bufferMismatches);
return base.ExportAsync(buffer, ctx);
}
/// <summary>
/// Provides a debug message to give insights to the machinery of <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}"/>
/// </summary>
public override string ToString() => $@"{base.ToString()}
{nameof(BufferMismatches)}: {BufferMismatches:N0}
------------------------------------------";
}