private async Task ReadTask()

in MREUnityRuntimeLib/IPC/Connections/WebSocket.cs [194:333]


		private async Task ReadTask()
		{
			try
			{
				var wasOpen = false;
				var stream = new MemoryStream();
				var receiveBuffer = new ArraySegment<byte>(new byte[ReceiveBufferSize], 0, ReceiveBufferSize);
				WebSocketReceiveResult result = default;
				CancellationTokenSource sendWorkerCancellationSource = default;
				Task sendWorker = default;

				while (true)
				{
					try
					{
						// Exit task if requested.
						CancellationToken.ThrowIfCancellationRequested();

						// Notify host the we're attempting to establish the connection.
						Invoke_OnConnecting();

						// Start connecting.
						wasOpen = false;
						await Connect().ConfigureAwait(false);

						 // Wait until the connection is fully resolved (whether that be success or failure).
						while (_ws.State == WebSocketState.Connecting)
						{
							// Exit task if requested.
							CancellationToken.ThrowIfCancellationRequested();
							// Pause for a very short period (polling for state change).
							await Task.Delay(20, CancellationToken).ConfigureAwait(false);
						}

						// If connection failed, retry after a short delay.
						if (_ws.State != WebSocketState.Open)
						{
							await Task.Delay(1000, CancellationToken).ConfigureAwait(false);
							continue;
						}

						// Connection has successfully established.
						wasOpen = true;

						// Once connected, start the send worker.
						sendWorkerCancellationSource = new CancellationTokenSource();
						var sendWorkerCancellationToken = sendWorkerCancellationSource.Token;
						sendWorker = Task.Run(async () =>
						{
							while (true)
							{
								try
								{
									sendWorkerCancellationSource.Token.ThrowIfCancellationRequested();

									if (_sendQueue.TryTake(out Func<Task> send, -1, sendWorkerCancellationToken))
									{
										await send().ConfigureAwait(false);
									}
								}
								catch
								{
									break;
								}
							}
						}, sendWorkerCancellationToken);

						// Notify host the connection was successfully established (it's important to do this after send worker startup).
						Invoke_OnConnected();

						while (_ws.State == WebSocketState.Open)
						{
							// Read a complete message.
							do
							{
								result = await _ws.ReceiveAsync(receiveBuffer, CancellationToken).ConfigureAwait(false);
								if (result != null)
								{
									if (result.MessageType == WebSocketMessageType.Close)
									{
										await CloseInternal(false, "Closed remotely").ConfigureAwait(false);
									}
									else if (result.MessageType == WebSocketMessageType.Text)
									{
										stream.Write(receiveBuffer.Array, 0, result.Count);
									}
									else
									{
										throw new NotSupportedException($"Unsupported WebSocketMessageType: {result.MessageType}");
									}
								}

								// Exit task if requested.
								CancellationToken.ThrowIfCancellationRequested();
							}
							while (result != null && !result.EndOfMessage);

							// Dispatch the message.
							if (result != null && result.EndOfMessage && stream.Length > 0)
							{
								var json = Encoding.UTF8.GetString(stream.GetBuffer(), 0, (int)stream.Length);
								Invoke_OnReceive(json);
							}

							// Reset accumulation buffer.
							stream.SetLength(0);
						}
					}
					catch (OperationCanceledException)
					{ }
					catch (Exception e)
					{
						Invoke_OnError(e);
					}

					if (wasOpen)
					{
						// Shutdown the send worker.
						sendWorkerCancellationSource.Cancel();
						await sendWorker.ConfigureAwait(false);
						sendWorkerCancellationSource.Dispose();

						// Notify host the connection was closed.
						Invoke_OnDisconnected();
					}

					// Exit task if requested.
					CancellationToken.ThrowIfCancellationRequested();

					// Attempt to reconnect after a short delay.
					await Task.Delay(1000, CancellationToken).ConfigureAwait(false);
				}
			}
			catch (OperationCanceledException)
			{ }
			catch (Exception e)
			{
				Invoke_OnError(e);
			}
		}