in src/Elastic.Ingest.Elasticsearch/Serialization/BulkRequestDataFactory.cs [84:121]
public static async Task WriteBufferToStreamAsync<TEvent>(ArraySegment<TEvent> page, Stream stream,
ElasticsearchChannelOptionsBase<TEvent> options, Func<TEvent, BulkOperationHeader> createHeaderFactory,
CancellationToken ctx = default)
{
#if NETSTANDARD2_1_OR_GREATER
var items = page;
#else
// needs cast prior to netstandard2.0
IReadOnlyList<TEvent> items = page;
#endif
// for is okay on ArraySegment, foreach performs bad:
// https://antao-almada.medium.com/how-to-use-span-t-and-memory-t-c0b126aae652
// ReSharper disable once ForCanBeConvertedToForeach
for (var i = 0; i < items.Count; i++)
{
var @event = items[i];
if (@event == null) continue;
var indexHeader = createHeaderFactory(@event);
await JsonSerializer.SerializeAsync(stream, indexHeader, indexHeader.GetType(), SerializerOptions, ctx)
.ConfigureAwait(false);
await stream.WriteAsync(LineFeed, 0, 1, ctx).ConfigureAwait(false);
if (indexHeader is UpdateOperation)
await stream.WriteAsync(DocUpdateHeaderStart, 0, DocUpdateHeaderStart.Length, ctx).ConfigureAwait(false);
if (options.EventWriter?.WriteToStreamAsync != null)
await options.EventWriter.WriteToStreamAsync(stream, @event, ctx).ConfigureAwait(false);
else
await JsonSerializer.SerializeAsync(stream, @event, SerializerOptions, ctx)
.ConfigureAwait(false);
if (indexHeader is UpdateOperation)
await stream.WriteAsync(DocUpdateHeaderEnd, 0, DocUpdateHeaderEnd.Length, ctx).ConfigureAwait(false);
await stream.WriteAsync(LineFeed, 0, 1, ctx).ConfigureAwait(false);
}
}