src/Elastic.Transport/Components/TransportClient/Content/RequestDataContent.cs (166 lines of code) (raw):

// Licensed to Elasticsearch B.V under one or more agreements. // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. // modified to be dedicated for BoundConfiguration only #if !NETFRAMEWORK using System; using System.Diagnostics.CodeAnalysis; using System.Diagnostics.Contracts; using System.IO; using System.IO.Compression; using System.Net; using System.Net.Http; using System.Net.Http.Headers; using System.Threading; using System.Threading.Tasks; namespace Elastic.Transport; /// <summary> /// Provides an <see cref="HttpContent"/> implementation that exposes an output <see cref="Stream"/> /// which can be written to directly. The ability to push data to the output stream differs from the /// <see cref="StreamContent"/> where data is pulled and not pushed. /// </summary> internal sealed class BoundConfigurationContent : HttpContent { private readonly BoundConfiguration _boundConfiguration; private readonly PostData? _postData; private readonly Func<BoundConfiguration, PostData?, CompleteTaskOnCloseStream, BoundConfigurationContent, TransportContext, CancellationToken, Task> _onStreamAvailableAsync; private readonly Action<BoundConfiguration, PostData?, CompleteTaskOnCloseStream, BoundConfigurationContent, TransportContext> _onStreamAvailable; private readonly CancellationToken _token; /// <summary> Constructor used in synchronous paths. </summary> public BoundConfigurationContent(BoundConfiguration boundConfiguration, PostData postData) { _boundConfiguration = boundConfiguration; _postData = postData; _token = default; Headers.TryAddWithoutValidation("Content-Type", boundConfiguration.ContentType); if (boundConfiguration.HttpCompression) Headers.ContentEncoding.Add("gzip"); _onStreamAvailable = OnStreamAvailable; _onStreamAvailableAsync = OnStreamAvailableAsync; } private static void OnStreamAvailable(BoundConfiguration boundConfiguration, PostData? postData, Stream stream, HttpContent content, TransportContext context) { if (postData == null) { stream.Dispose(); return; } if (boundConfiguration.HttpCompression) stream = new GZipStream(stream, CompressionMode.Compress, false); using (stream) postData.Write(stream, boundConfiguration.ConnectionSettings, boundConfiguration.DisableDirectStreaming); } /// <summary> Constructor used in asynchronous paths. </summary> public BoundConfigurationContent(BoundConfiguration boundConfiguration, CancellationToken token) { _boundConfiguration = boundConfiguration; _token = token; Headers.TryAddWithoutValidation("Content-Type", boundConfiguration.ContentType); if (boundConfiguration.HttpCompression) Headers.ContentEncoding.Add("gzip"); _onStreamAvailable = OnStreamAvailable; _onStreamAvailableAsync = OnStreamAvailableAsync; } private static async Task OnStreamAvailableAsync(BoundConfiguration boundConfiguration, PostData? postData, Stream stream, HttpContent content, TransportContext context, CancellationToken ctx = default) { if (postData == null) { #if NET6_0_OR_GREATER await stream.DisposeAsync().ConfigureAwait(false); #else stream.Dispose(); #endif return; } if (boundConfiguration.HttpCompression) stream = new GZipStream(stream, CompressionMode.Compress, false); #if NET6_0_OR_GREATER await using (stream.ConfigureAwait(false)) #else using (stream) #endif await postData.WriteAsync(stream, boundConfiguration.ConnectionSettings, boundConfiguration.DisableDirectStreaming, ctx).ConfigureAwait(false); } /// <summary> /// When this method is called, it calls the action provided in the constructor with the output /// stream to write to. Once the action has completed its work it closes the stream which will /// close this content instance and complete the HTTP request or response. /// </summary> /// <param name="stream">The <see cref="Stream"/> to which to write.</param> /// <param name="context">The associated <see cref="TransportContext"/>.</param> /// <returns>A <see cref="Task"/> instance that is asynchronously serializing the object's content.</returns> [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "Exception is passed as task result.")] protected override Task SerializeToStreamAsync(Stream stream, TransportContext context) => SerializeToStreamAsync(stream, context, default); #if NET6_0_OR_GREATER protected override #else private #endif async Task SerializeToStreamAsync(Stream stream, TransportContext context, CancellationToken cancellationToken) { var source = CancellationTokenSource.CreateLinkedTokenSource(_token, cancellationToken); var serializeToStreamTask = new TaskCompletionSource<bool>(); var wrappedStream = new CompleteTaskOnCloseStream(stream, serializeToStreamTask); await _onStreamAvailableAsync(_boundConfiguration, _postData, wrappedStream, this, context, source.Token).ConfigureAwait(false); await serializeToStreamTask.Task.ConfigureAwait(false); } #if NET6_0_OR_GREATER protected override void SerializeToStream(Stream stream, TransportContext context, CancellationToken _) { var serializeToStreamTask = new TaskCompletionSource<bool>(); using var wrappedStream = new CompleteTaskOnCloseStream(stream, serializeToStreamTask); _onStreamAvailable(_boundConfiguration, _postData, wrappedStream, this, context); //await serializeToStreamTask.Task.ConfigureAwait(false); } #endif /// <summary> /// Computes the length of the stream if possible. /// </summary> /// <param name="length">The computed length of the stream.</param> /// <returns><c>true</c> if the length has been computed; otherwise <c>false</c>.</returns> protected override bool TryComputeLength(out long length) { // We can't know the length of the content being pushed to the output stream. length = -1; return false; } internal class CompleteTaskOnCloseStream : DelegatingStream { private readonly TaskCompletionSource<bool> _serializeToStreamTask; public CompleteTaskOnCloseStream(Stream innerStream, TaskCompletionSource<bool> serializeToStreamTask) : base(innerStream) { Contract.Assert(serializeToStreamTask != null); _serializeToStreamTask = serializeToStreamTask; } protected override void Dispose(bool disposing) { _serializeToStreamTask.TrySetResult(true); base.Dispose(); } public override void Close() => _serializeToStreamTask.TrySetResult(true); } /// <summary> /// Stream that delegates to inner stream. /// This is taken from System.Net.Http /// </summary> internal abstract class DelegatingStream : Stream { private readonly Stream _innerStream; protected DelegatingStream(Stream innerStream) => _innerStream = innerStream ?? throw new ArgumentNullException(nameof(innerStream)); public override bool CanRead => _innerStream.CanRead; public override bool CanSeek => _innerStream.CanSeek; public override bool CanWrite => _innerStream.CanWrite; public override long Length => _innerStream.Length; public override long Position { get => _innerStream.Position; set => _innerStream.Position = value; } public override int ReadTimeout { get => _innerStream.ReadTimeout; set => _innerStream.ReadTimeout = value; } public override bool CanTimeout => _innerStream.CanTimeout; public override int WriteTimeout { get => _innerStream.WriteTimeout; set => _innerStream.WriteTimeout = value; } protected override void Dispose(bool disposing) { if (disposing) _innerStream.Dispose(); base.Dispose(disposing); } public override long Seek(long offset, SeekOrigin origin) => _innerStream.Seek(offset, origin); public override int Read(byte[] buffer, int offset, int count) => _innerStream.Read(buffer, offset, count); public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => _innerStream.ReadAsync(buffer, offset, count, cancellationToken); public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) => _innerStream.BeginRead(buffer, offset, count, callback, state); public override int EndRead(IAsyncResult asyncResult) => _innerStream.EndRead(asyncResult); public override int ReadByte() => _innerStream.ReadByte(); public override void Flush() => _innerStream.Flush(); public override Task FlushAsync(CancellationToken cancellationToken) => _innerStream.FlushAsync(cancellationToken); public override void SetLength(long value) => _innerStream.SetLength(value); public override void Write(byte[] buffer, int offset, int count) => _innerStream.Write(buffer, offset, count); public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => _innerStream.WriteAsync(buffer, offset, count, cancellationToken); public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state) => _innerStream.BeginWrite(buffer, offset, count, callback, state); public override void EndWrite(IAsyncResult asyncResult) => _innerStream.EndWrite(asyncResult); public override void WriteByte(byte value) => _innerStream.WriteByte(value); public override void Close() => _innerStream.Close(); } } #endif