sources/Google.Solutions.Ssh/SshShellChannel.cs (138 lines of code) (raw):

// // Copyright 2024 Google LLC // // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // using Google.Solutions.Common.Util; using Google.Solutions.Platform.IO; using Google.Solutions.Ssh.Native; using System; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Google.Solutions.Ssh { /// <summary> /// Channel for interacting with a remote shell. /// </summary> /// <remarks> /// Events are delivered on the worker thread. /// </remarks> public class SshShellChannel : SshChannelBase, IPseudoTerminal { /// <summary> /// Encoding used by pseudo-terminal. /// </summary> public static readonly Encoding DefaultEncoding = Encoding.UTF8; /// <summary> /// Channel handle, must only be accessed on worker thread. /// </summary> private readonly Libssh2ShellChannel nativeChannel; private readonly StreamingDecoder receiveDecoder; private readonly byte[] receiveBuffer = new byte[64 * 1024]; /// <summary> /// Task that is completed when an EOF was received. /// </summary> /// <remarks> /// Force continuations to run asycnhronously so that they /// don't block the worker thread. /// </remarks> private readonly TaskCompletionSource<object?> endOfStream = new TaskCompletionSource<object?>( TaskCreationOptions.RunContinuationsAsynchronously); internal SshShellChannel( SshConnection connection, Libssh2ShellChannel nativeChannel) { this.Connection = connection; this.nativeChannel = nativeChannel; this.receiveDecoder = new StreamingDecoder(DefaultEncoding); } //--------------------------------------------------------------------- // IPseudoTerminal. //--------------------------------------------------------------------- public event EventHandler<PseudoTerminalDataEventArgs>? OutputAvailable; public event EventHandler<PseudoTerminalErrorEventArgs>? FatalError; public event EventHandler<EventArgs>? Disconnected; public Task DrainAsync() { return this.endOfStream.Task; } public async Task ResizeAsync( PseudoTerminalSize dimensions, CancellationToken cancellationToken) { // // Switch to worker thread and write to channel. // try { await this.Connection .RunAsync(_ => { Debug.Assert(this.Connection.IsRunningOnWorkerThread); this.nativeChannel.ResizePseudoTerminal( dimensions.Width, dimensions.Height); }) .ConfigureAwait(false); } catch (SshConnectionClosedException) { // // Connection closed already. This can happen // if the connection was disconnected by the // server, and the client has't caught up to that. // } } public async Task WriteAsync( string data, CancellationToken cancellationToken) { if (data.Length == 0) { return; } // // Switch to worker thread and write to channel. // await this.Connection .RunAsync(_ => { Debug.Assert(this.Connection.IsRunningOnWorkerThread); this.nativeChannel.Write(DefaultEncoding.GetBytes(data)); }) .ConfigureAwait(false); } //--------------------------------------------------------------------- // Overrides. //--------------------------------------------------------------------- public override SshConnection Connection { get; } internal override void OnReceive() { Debug.Assert(this.Connection.IsRunningOnWorkerThread); // // NB. This method is always called on the same thread, so it's ok // to reuse the same buffer. // uint bytesReceived; bool endOfStream; // // Read as much data as available. // do { bytesReceived = this.nativeChannel.Read(this.receiveBuffer); endOfStream = this.nativeChannel.IsEndOfStream; var receivedData = this.receiveDecoder.Decode( this.receiveBuffer, 0, (int)bytesReceived); try { if (bytesReceived > 0) { this.OutputAvailable?.Invoke( this, new PseudoTerminalDataEventArgs(receivedData)); } if (endOfStream) { // // End of stream reached, that means we're // disconnecting. // this.Disconnected?.Invoke(this, EventArgs.Empty); this.endOfStream.SetResult(null); } } catch (Exception e) { this.FatalError?.Invoke(this, new PseudoTerminalErrorEventArgs(e)); } } while (bytesReceived > 0 && !endOfStream); } internal override void OnReceiveError(Exception exception) { Debug.Assert(this.Connection.IsRunningOnWorkerThread); var errorsIndicatingLostConnection = new[] { LIBSSH2_ERROR.SOCKET_SEND, LIBSSH2_ERROR.SOCKET_RECV, LIBSSH2_ERROR.SOCKET_TIMEOUT }; var unwrappedException = exception.Unwrap(); var lostConnection = unwrappedException is Libssh2Exception sshEx && errorsIndicatingLostConnection.Contains(sshEx.ErrorCode); if (lostConnection) { this.Disconnected?.Invoke(this, EventArgs.Empty); } else { this.FatalError?.Invoke( this, new PseudoTerminalErrorEventArgs(unwrappedException)); } } protected override void Close() { Debug.Assert(this.Connection.IsRunningOnWorkerThread); this.nativeChannel.Close(); this.nativeChannel.Dispose(); } } }