in src/Elastic.Channels/BufferedChannelBase.cs [129:208]
protected BufferedChannelBase(TChannelOptions options, ICollection<IChannelCallbacks<TEvent, TResponse>>? callbackListeners)
{
TokenSource = options.CancellationToken.HasValue
? CancellationTokenSource.CreateLinkedTokenSource(options.CancellationToken.Value)
: new CancellationTokenSource();
Options = options;
var listeners = callbackListeners == null ? new[] { Options } : callbackListeners.Concat(new[] { Options }).ToArray();
DiagnosticsListener = listeners
.Select(l => (l is IChannelDiagnosticsListener c) ? c : null)
.FirstOrDefault(e => e != null);
if (DiagnosticsListener == null && !options.DisableDiagnostics)
{
// if no debug listener was already provided but was requested explicitly create one.
var l = new ChannelDiagnosticsListener<TEvent, TResponse>(GetType().Name);
DiagnosticsListener = l;
listeners = listeners.Concat(new[] { l }).ToArray();
}
_callbacks = new ChannelCallbackInvoker<TEvent, TResponse>(listeners);
var maxIn = Math.Max(Math.Max(1, BufferOptions.InboundBufferMaxSize), BufferOptions.OutboundBufferMaxSize);
var defaultMaxOut = Math.Max(1, BufferOptions.OutboundBufferMaxSize);
var calculatedConcurrency = (int)Math.Ceiling(maxIn / (double)defaultMaxOut);
var defaultConcurrency = Environment.ProcessorCount * 2;
MaxConcurrency = BufferOptions.ExportMaxConcurrency ?? Math.Min(calculatedConcurrency, defaultConcurrency);
// The minimum out buffer the max of (1 or OutboundBufferMaxSize) as long as it does not exceed InboundBufferMaxSize / (MaxConcurrency * 2)
BatchExportSize = Math.Min(BufferOptions.InboundBufferMaxSize / (MaxConcurrency), Math.Max(1, BufferOptions.OutboundBufferMaxSize));
DrainSize = Math.Min(100_000, Math.Min(BatchExportSize * 2, maxIn / 2));
_taskList = new List<Task>(MaxConcurrency * 2);
_throttleExportTasks = new SemaphoreSlim(MaxConcurrency, MaxConcurrency);
_signal = options.BufferOptions.WaitHandle;
_waitForOutboundRead = new CountdownEvent(1);
OutChannel = Channel.CreateBounded<IOutboundBuffer<TEvent>>(
new BoundedChannelOptions(MaxConcurrency * 4)
{
SingleReader = false,
SingleWriter = true,
// Stephen Toub comment: https://github.com/dotnet/runtime/issues/26338#issuecomment-393720727
// AFAICT this is fine since we run in a dedicated long running task.
AllowSynchronousContinuations = true,
// wait does not block it simply signals that Writer.TryWrite should return false and be retried
// DropWrite will make `TryWrite` always return true, which is not what we want.
FullMode = BoundedChannelFullMode.Wait
});
//we don't expose the fact TEvent is nullable to the consumer
Action<TEvent?>? itemDropped = options.BufferItemDropped is null ? null : e =>
{
if (e is not null)
options.BufferItemDropped?.Invoke(e);
};
InChannel = Channel.CreateBounded<TEvent?>(new BoundedChannelOptions(maxIn)
{
SingleReader = true,
SingleWriter = false,
// Stephen Toub comment: https://github.com/dotnet/runtime/issues/26338#issuecomment-393720727
// AFAICT this is fine since we run in a dedicated long running task.
AllowSynchronousContinuations = true,
// wait does not block it simply signals that Writer.TryWrite should return false and be retried
// DropWrite will make `TryWrite` always return true, which is not what we want.
FullMode = options.BufferOptions.BoundedChannelFullMode
}, itemDropped);
InboundBuffer = new InboundBuffer<TEvent>(BatchExportSize, BufferOptions.OutboundBufferMaxLifetime);
_outTask = Task.Factory.StartNew(async () =>
await ConsumeOutboundEventsAsync().ConfigureAwait(false),
CancellationToken.None,
TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness,
TaskScheduler.Default);
_inTask = Task.Factory.StartNew(async () =>
await ConsumeInboundEventsAsync(BatchExportSize, BufferOptions.OutboundBufferMaxLifetime).ConfigureAwait(false),
CancellationToken.None,
TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness,
TaskScheduler.Default);
}