in MREUnityRuntimeLib/IPC/Connections/WebSocket.cs [194:333]
private async Task ReadTask()
{
try
{
var wasOpen = false;
var stream = new MemoryStream();
var receiveBuffer = new ArraySegment<byte>(new byte[ReceiveBufferSize], 0, ReceiveBufferSize);
WebSocketReceiveResult result = default;
CancellationTokenSource sendWorkerCancellationSource = default;
Task sendWorker = default;
while (true)
{
try
{
// Exit task if requested.
CancellationToken.ThrowIfCancellationRequested();
// Notify host the we're attempting to establish the connection.
Invoke_OnConnecting();
// Start connecting.
wasOpen = false;
await Connect().ConfigureAwait(false);
// Wait until the connection is fully resolved (whether that be success or failure).
while (_ws.State == WebSocketState.Connecting)
{
// Exit task if requested.
CancellationToken.ThrowIfCancellationRequested();
// Pause for a very short period (polling for state change).
await Task.Delay(20, CancellationToken).ConfigureAwait(false);
}
// If connection failed, retry after a short delay.
if (_ws.State != WebSocketState.Open)
{
await Task.Delay(1000, CancellationToken).ConfigureAwait(false);
continue;
}
// Connection has successfully established.
wasOpen = true;
// Once connected, start the send worker.
sendWorkerCancellationSource = new CancellationTokenSource();
var sendWorkerCancellationToken = sendWorkerCancellationSource.Token;
sendWorker = Task.Run(async () =>
{
while (true)
{
try
{
sendWorkerCancellationSource.Token.ThrowIfCancellationRequested();
if (_sendQueue.TryTake(out Func<Task> send, -1, sendWorkerCancellationToken))
{
await send().ConfigureAwait(false);
}
}
catch
{
break;
}
}
}, sendWorkerCancellationToken);
// Notify host the connection was successfully established (it's important to do this after send worker startup).
Invoke_OnConnected();
while (_ws.State == WebSocketState.Open)
{
// Read a complete message.
do
{
result = await _ws.ReceiveAsync(receiveBuffer, CancellationToken).ConfigureAwait(false);
if (result != null)
{
if (result.MessageType == WebSocketMessageType.Close)
{
await CloseInternal(false, "Closed remotely").ConfigureAwait(false);
}
else if (result.MessageType == WebSocketMessageType.Text)
{
stream.Write(receiveBuffer.Array, 0, result.Count);
}
else
{
throw new NotSupportedException($"Unsupported WebSocketMessageType: {result.MessageType}");
}
}
// Exit task if requested.
CancellationToken.ThrowIfCancellationRequested();
}
while (result != null && !result.EndOfMessage);
// Dispatch the message.
if (result != null && result.EndOfMessage && stream.Length > 0)
{
var json = Encoding.UTF8.GetString(stream.GetBuffer(), 0, (int)stream.Length);
Invoke_OnReceive(json);
}
// Reset accumulation buffer.
stream.SetLength(0);
}
}
catch (OperationCanceledException)
{ }
catch (Exception e)
{
Invoke_OnError(e);
}
if (wasOpen)
{
// Shutdown the send worker.
sendWorkerCancellationSource.Cancel();
await sendWorker.ConfigureAwait(false);
sendWorkerCancellationSource.Dispose();
// Notify host the connection was closed.
Invoke_OnDisconnected();
}
// Exit task if requested.
CancellationToken.ThrowIfCancellationRequested();
// Attempt to reconnect after a short delay.
await Task.Delay(1000, CancellationToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
{ }
catch (Exception e)
{
Invoke_OnError(e);
}
}