sources/Google.Solutions.Iap/Net/WebSocketStream.cs (206 lines of code) (raw):
//
// Copyright 2019 Google LLC
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
using Google.Solutions.Common.Diagnostics;
using Google.Solutions.Common.Util;
using System;
using System.Diagnostics;
using System.Net.Sockets;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
namespace Google.Solutions.Iap.Net
{
/// <summary>
/// Stream that allows sending and receiving WebSocket frames.
/// </summary>
public class WebSocketStream : SingleReaderSingleWriterStream
{
private readonly ClientWebSocket socket;
private volatile bool closeByClientInitiated = false;
public bool IsCloseInitiated => this.closeByClientInitiated;
//---------------------------------------------------------------------
// Ctor
//---------------------------------------------------------------------
public WebSocketStream(ClientWebSocket socket)
{
if (socket.State != WebSocketState.Open)
{
throw new ArgumentException("Web socket must be open");
}
this.socket = socket.ExpectNotNull(nameof(socket));
}
//---------------------------------------------------------------------
// Privates
//---------------------------------------------------------------------
private static bool IsSocketError(Exception caughtEx, SocketError error)
{
// ClientWebSocket throws almost arbitrary nestings of
// SocketExceptions, IOExceptions, WebSocketExceptions, all
// wrapped in AggregateExceptions.
for (var ex = caughtEx; ex != null; ex = ex.InnerException)
{
if (ex is SocketException socketException)
{
if (socketException.SocketErrorCode == error)
{
return true;
}
}
}
return false;
}
private static bool IsWebSocketError(Exception caughtEx, WebSocketError error)
{
// ClientWebSocket throws almost arbitrary nestings of
// WebSocketException, IOExceptions, SocketExceptions, all
// wrapped in AggregateExceptions.
for (var ex = caughtEx; ex != null; ex = ex.InnerException)
{
if (ex is WebSocketException socketException)
{
if (socketException.WebSocketErrorCode == error)
{
return true;
}
}
}
return false;
}
//---------------------------------------------------------------------
// SingleReaderSingleWriterStream implementation
//---------------------------------------------------------------------
private void VerifyConnectionNotClosedAlready()
{
if (this.closeByClientInitiated)
{
// Do not even try to send, it will not succeed anyway.
throw new WebSocketStreamClosedByClientException();
}
}
/// <summary>
/// Read until (a) the buffer is full or (b) the end of
/// the frame has been reached.
/// </summary>
protected override async Task<int> ProtectedReadAsync(
byte[] buffer,
int offset,
int count,
CancellationToken cancellationToken)
{
VerifyConnectionNotClosedAlready();
try
{
var bytesReceived = 0;
var bytesLeftInBuffer = count;
WebSocketReceiveResult result;
do
{
IapTraceSource.Log.TraceVerbose(
"WebSocketStream: begin ReadAsync()... [socket: {0}]",
this.socket.State);
result = await this.socket.ReceiveAsync(
new ArraySegment<byte>(
buffer,
offset + bytesReceived,
bytesLeftInBuffer),
cancellationToken)
.ConfigureAwait(false);
bytesReceived += result.Count;
bytesLeftInBuffer -= result.Count;
Debug.Assert(bytesReceived + bytesLeftInBuffer == count);
IapTraceSource.Log.TraceVerbose(
"WebSocketStream: end ReadAsync() - {0} bytes read [socket: {1}]",
result.Count,
this.socket.State);
}
while (bytesLeftInBuffer > 0 && !result.EndOfMessage);
if (result.CloseStatus != null)
{
Debug.Assert(bytesReceived == 0);
IapTraceSource.Log.TraceVerbose(
"WebSocketStream: Connection closed by server: {0}",
result.CloseStatus);
//
// In case of a normal close, it is preferable to simply return 0. But
// if the connection was closed abnormally, the client needs to know
// the details.
//
if (result.CloseStatus.Value != WebSocketCloseStatus.NormalClosure)
{
throw new WebSocketStreamClosedByServerException(
result.CloseStatus.Value,
result.CloseStatusDescription);
}
else
{
Debug.Assert(bytesReceived == 0);
}
}
return bytesReceived;
}
catch (Exception e) when (
IsSocketError(e, SocketError.ConnectionAborted) ||
IsWebSocketError(e, WebSocketError.ConnectionClosedPrematurely))
{
IapTraceSource.Log.TraceVerbose("WebSocketStream.Read: connection aborted - {0}", e);
throw new WebSocketStreamClosedByServerException(
(WebSocketCloseStatus)1006, // Abnormal closure
e.Message);
}
}
protected override async Task ProtectedWriteAsync(
byte[] buffer,
int offset,
int count,
CancellationToken cancellationToken)
{
VerifyConnectionNotClosedAlready();
try
{
IapTraceSource.Log.TraceVerbose(
"WebSocketStream: begin WriteAsync({0} bytes)... [socket: {1}]",
count,
this.socket.State);
await this.socket.SendAsync(
new ArraySegment<byte>(buffer, offset, count),
WebSocketMessageType.Binary,
true,
cancellationToken)
.ConfigureAwait(false);
IapTraceSource.Log.TraceVerbose(
"WebSocketStream: end WriteAsync()... [socket: {0}]",
this.socket.State);
}
catch (Exception e) when (
IsSocketError(e, SocketError.ConnectionAborted) ||
IsWebSocketError(e, WebSocketError.ConnectionClosedPrematurely))
{
IapTraceSource.Log.TraceVerbose("WebSocketStream.Write: connection aborted - {0}", e);
throw new WebSocketStreamClosedByServerException(
(WebSocketCloseStatus)1006, // Abnormal closure
e.Message);
}
}
public override async Task ProtectedCloseAsync(CancellationToken cancellationToken)
{
VerifyConnectionNotClosedAlready();
try
{
this.closeByClientInitiated = true;
await this.socket.CloseOutputAsync(
WebSocketCloseStatus.NormalClosure,
string.Empty,
cancellationToken).ConfigureAwait(false);
}
catch (Exception e) when (
IsWebSocketError(e, WebSocketError.InvalidMessageType) ||
IsWebSocketError(e, WebSocketError.InvalidState) ||
IsSocketError(e, SocketError.ConnectionAborted))
{
//
// Server already closed the connection - nevermind then.
//
}
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
this.socket.Dispose();
}
}
public class WebSocketStreamClosedByClientException : NetworkStreamClosedException
{
public WebSocketStreamClosedByClientException()
: base("The connection has already been closed by the client")
{
}
}
public class WebSocketConnectionDeniedException : Exception
{
public WebSocketConnectionDeniedException()
: base("The server denied the use of WebSockets")
{
}
}
public class WebSocketStreamClosedByServerException : NetworkStreamClosedException
{
public WebSocketCloseStatus CloseStatus { get; private set; }
public string CloseStatusDescription { get; private set; }
public WebSocketStreamClosedByServerException(
WebSocketCloseStatus closeStatus,
string closeStatusDescription)
: base($"{closeStatusDescription} (code {closeStatus})")
{
this.CloseStatus = closeStatus;
this.CloseStatusDescription = closeStatusDescription;
}
}
}