protected BufferedChannelBase()

in src/Elastic.Channels/BufferedChannelBase.cs [129:208]


	protected BufferedChannelBase(TChannelOptions options, ICollection<IChannelCallbacks<TEvent, TResponse>>? callbackListeners)
	{
		TokenSource = options.CancellationToken.HasValue
			? CancellationTokenSource.CreateLinkedTokenSource(options.CancellationToken.Value)
			: new CancellationTokenSource();
		Options = options;

		var listeners = callbackListeners == null ? new[] { Options } : callbackListeners.Concat(new[] { Options }).ToArray();
		DiagnosticsListener = listeners
			.Select(l => (l is IChannelDiagnosticsListener c) ? c : null)
			.FirstOrDefault(e => e != null);
		if (DiagnosticsListener == null && !options.DisableDiagnostics)
		{
			// if no debug listener was already provided but was requested explicitly create one.
			var l = new ChannelDiagnosticsListener<TEvent, TResponse>(GetType().Name);
			DiagnosticsListener = l;
			listeners = listeners.Concat(new[] { l }).ToArray();
		}
		_callbacks = new ChannelCallbackInvoker<TEvent, TResponse>(listeners);

		var maxIn = Math.Max(Math.Max(1, BufferOptions.InboundBufferMaxSize), BufferOptions.OutboundBufferMaxSize);
		var defaultMaxOut = Math.Max(1, BufferOptions.OutboundBufferMaxSize);
		var calculatedConcurrency = (int)Math.Ceiling(maxIn / (double)defaultMaxOut);
		var defaultConcurrency = Environment.ProcessorCount * 2;
		MaxConcurrency = BufferOptions.ExportMaxConcurrency ?? Math.Min(calculatedConcurrency, defaultConcurrency);

		// The minimum out buffer the max of (1 or OutboundBufferMaxSize) as long as it does not exceed InboundBufferMaxSize / (MaxConcurrency * 2)
		BatchExportSize = Math.Min(BufferOptions.InboundBufferMaxSize / (MaxConcurrency), Math.Max(1, BufferOptions.OutboundBufferMaxSize));
		DrainSize = Math.Min(100_000, Math.Min(BatchExportSize * 2, maxIn / 2));

		_taskList = new List<Task>(MaxConcurrency * 2);

		_throttleExportTasks = new SemaphoreSlim(MaxConcurrency, MaxConcurrency);
		_signal = options.BufferOptions.WaitHandle;
		_waitForOutboundRead = new CountdownEvent(1);
		OutChannel = Channel.CreateBounded<IOutboundBuffer<TEvent>>(
			new BoundedChannelOptions(MaxConcurrency * 4)
			{
				SingleReader = false,
				SingleWriter = true,
				// Stephen Toub comment: https://github.com/dotnet/runtime/issues/26338#issuecomment-393720727
				// AFAICT this is fine since we run in a dedicated long running task.
				AllowSynchronousContinuations = true,
				// wait does not block it simply signals that Writer.TryWrite should return false and be retried
				// DropWrite will make `TryWrite` always return true, which is not what we want.
				FullMode = BoundedChannelFullMode.Wait
			});

		//we don't expose the fact TEvent is nullable to the consumer
		Action<TEvent?>? itemDropped = options.BufferItemDropped is null ? null : e =>
		{
			if (e is not null)
				options.BufferItemDropped?.Invoke(e);
		};
		InChannel = Channel.CreateBounded<TEvent?>(new BoundedChannelOptions(maxIn)
		{
			SingleReader = true,
			SingleWriter = false,
			// Stephen Toub comment: https://github.com/dotnet/runtime/issues/26338#issuecomment-393720727
			// AFAICT this is fine since we run in a dedicated long running task.
			AllowSynchronousContinuations = true,
			// wait does not block it simply signals that Writer.TryWrite should return false and be retried
			// DropWrite will make `TryWrite` always return true, which is not what we want.
			FullMode = options.BufferOptions.BoundedChannelFullMode
		}, itemDropped);

		InboundBuffer = new InboundBuffer<TEvent>(BatchExportSize, BufferOptions.OutboundBufferMaxLifetime);

		_outTask = Task.Factory.StartNew(async () =>
				await ConsumeOutboundEventsAsync().ConfigureAwait(false),
			CancellationToken.None,
			TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness,
			TaskScheduler.Default);

		_inTask = Task.Factory.StartNew(async () =>
				await ConsumeInboundEventsAsync(BatchExportSize, BufferOptions.OutboundBufferMaxLifetime).ConfigureAwait(false),
			CancellationToken.None,
			TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness,
			TaskScheduler.Default);
	}