src/Microsoft.Azure.Relay/HybridConnectionStream.cs (62 lines of code) (raw):
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Microsoft.Azure.Relay
{
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// A Stream representing a connected HybridConnection. Use it just like any other Stream with the addition of a
/// Shutdown method for notifying the other side of this connection that shutdown is occurring.
/// </summary>
public abstract class HybridConnectionStream : Stream, ITraceSource
{
string cachedToString;
internal HybridConnectionStream(TrackingContext trackingContext)
{
this.TrackingContext = trackingContext;
}
/// <summary>
/// Sets or gets the WriteMode for this stream. Default is WriteMode.Binary
/// </summary>
public WriteMode WriteMode { get; set; } = WriteMode.Binary;
/// <summary>
/// Gets the TrackingContext for this stream.
/// </summary>
public TrackingContext TrackingContext { get; }
/// <summary>
/// Initiates a graceful close process by shutting down sending through this
/// <see cref="HybridConnectionStream"/>. To disconnect cleanly and asynchronously, call Shutdown,
/// wait for Read/ReadAsync to complete with a 0 byte read, then finally call Stream.Close();
/// </summary>
public virtual void Shutdown()
{
using (var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(this.WriteTimeout)))
{
this.ShutdownAsync(cts.Token).ConfigureAwait(false).GetAwaiter().GetResult();
}
}
/// <summary>
/// Initiates a graceful close process by shutting down sending through this
/// <see cref="HybridConnectionStream"/>. To disconnect cleanly and asynchronously, call ShutdownAsync,
/// wait for Read/ReadAsync to complete with a 0 byte read, then finally call Stream.CloseAsync();
/// </summary>
/// <param name="cancellationToken">A cancellation token to observe.</param>
public async Task ShutdownAsync(CancellationToken cancellationToken)
{
RelayEventSource.Log.Info(this, "Shutting down");
await this.OnShutdownAsync(cancellationToken).ConfigureAwait(false);
RelayEventSource.Log.Info(this, "Shut down");
}
/// <summary>
/// Returns a string that represents the current object. Includes a TrackingId for end to end correlation.
/// </summary>
public override string ToString()
{
return this.cachedToString ?? (this.cachedToString = nameof(HybridConnectionStream) + "(" + this.TrackingContext + ")");
}
/// <summary>
/// Closes this <see cref="HybridConnectionStream"/> instance.
/// </summary>
/// <param name="disposing">true to release both managed and unmanaged resources; false to release only unmanaged resources.</param>
protected override void Dispose(bool disposing)
{
if (disposing)
{
using (var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(this.ReadTimeout)))
{
this.CloseAsync(cts.Token).ConfigureAwait(false).GetAwaiter().GetResult();
}
}
base.Dispose(disposing);
}
/// <summary>
/// Closes this <see cref="HybridConnectionStream"/> instance asynchronously using a <see cref="CancellationToken"/>.
/// </summary>
/// <param name="cancellationToken">A cancellation token to observe.</param>
public async Task CloseAsync(CancellationToken cancellationToken)
{
try
{
RelayEventSource.Log.ObjectClosing(this);
await this.OnCloseAsync(cancellationToken).ConfigureAwait(false);
RelayEventSource.Log.ObjectClosed(this);
}
catch (Exception e) when (!Fx.IsFatal(e))
{
RelayEventSource.Log.ThrowingException(e, this);
throw;
}
}
/// <summary>
/// Derived classes implement shutdown logic in this method.
/// </summary>
/// <param name="cancellationToken">A cancellation token to observe.</param>
protected abstract Task OnShutdownAsync(CancellationToken cancellationToken);
/// <summary>
/// Derived classes implement close logic in this method.
/// </summary>
/// <param name="cancellationToken">A cancellation token to observe.</param>
protected abstract Task OnCloseAsync(CancellationToken cancellationToken);
}
}