in src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs [57:92]
protected override List<(TEvent, BulkResponseItem)> Zip(BulkResponse response, IReadOnlyCollection<TEvent> page) =>
page.Zip(response.Items, (doc, item) => (doc, item)).ToList();
/// <inheritdoc cref="ResponseItemsBufferedChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}.RetryEvent"/>
protected override bool RetryEvent((TEvent, BulkResponseItem) @event) =>
RetryStatusCodes.Contains(@event.Item2.Status);
/// <inheritdoc cref="ResponseItemsBufferedChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}.RejectEvent"/>
protected override bool RejectEvent((TEvent, BulkResponseItem) @event) =>
@event.Item2.Status < 200 || @event.Item2.Status > 300;
/// <inheritdoc cref="TransportChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}.ExportAsync(Elastic.Transport.ITransport,System.ArraySegment{TEvent},System.Threading.CancellationToken)"/>
protected override Task<BulkResponse> ExportAsync(ITransport transport, ArraySegment<TEvent> page, CancellationToken ctx = default)
{
ctx = ctx == default ? TokenSource.Token : ctx;
#if NETSTANDARD2_1
// Option is obsolete to prevent external users to set it.
#pragma warning disable CS0618
if (Options.UseReadOnlyMemory)
#pragma warning restore CS0618
{
var bytes = BulkRequestDataFactory.GetBytes(page, Options, CreateBulkOperationHeader);
return transport.RequestAsync<BulkResponse>(HttpMethod.POST, BulkPathAndQuery, PostData.ReadOnlyMemory(bytes), ctx);
}
#endif
#pragma warning disable IDE0022 // Use expression body for method
return transport.RequestAsync<BulkResponse>(new (HttpMethod.POST, BulkPathAndQuery),
PostData.StreamHandler(page,
(_, _) =>
{
/* NOT USED */
},
async (b, stream, ctx) => { await BulkRequestDataFactory.WriteBufferToStreamAsync(b, stream, Options, CreateBulkOperationHeader, ctx).ConfigureAwait(false); })
, ctx);
#pragma warning restore IDE0022 // Use expression body for method
}