sources/Google.Solutions.Ssh/SftpFileStream.cs (197 lines of code) (raw):

// // Copyright 2024 Google LLC // // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // using Google.Solutions.Common.Util; using Google.Solutions.Ssh.Native; using System; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.IO; using System.Threading; using System.Threading.Tasks; namespace Google.Solutions.Ssh { internal class SftpFileStream : Stream { /// <summary> /// Native channel, can only be accessed on the worker thread. /// </summary> private readonly Libssh2SftpFileChannel nativeChannel; /// <summary> /// Connection used by this stream. /// </summary> public SshConnection Connection { get; } /// <summary> /// Flags the file has been opened with. /// </summary> private readonly LIBSSH2_FXF_FLAGS flags; private readonly LIBSSH2_SFTP_ATTRIBUTES attributes; private long position; internal SftpFileStream( SshConnection connection, Libssh2SftpFileChannel nativeChannel, LIBSSH2_FXF_FLAGS flags) { Debug.Assert(connection.IsRunningOnWorkerThread); this.Connection = connection; this.nativeChannel = nativeChannel; this.flags = flags; this.attributes = nativeChannel.Attributes; } protected override void Dispose(bool disposing) { base.Dispose(disposing); if (disposing) { if (this.Connection.IsRunningOnWorkerThread) { this.nativeChannel.Dispose(); } else { _ = this.Connection.RunAsync( _ => this.nativeChannel.Dispose(), false); } } } //---------------------------------------------------------------------- // Attributes. //---------------------------------------------------------------------- /// <summary> /// Length of file at the time it was opened. /// </summary> public override long Length { get { // // Return size at the time of open so that we don't need // to block. // if (this.attributes.flags.HasFlag(LIBSSH2_SFTP_ATTR.SIZE)) { return (long)this.attributes.filesize; } else { throw new InvalidOperationException( "The file size is not available"); } } } //---------------------------------------------------------------------- // Reading. //---------------------------------------------------------------------- public override bool CanRead { get => this.flags.HasFlag(LIBSSH2_FXF_FLAGS.READ); } [SuppressMessage("Usage", "VSTHRD002:Avoid problematic synchronous waits", Justification = "")] public override int Read(byte[] buffer, int offset, int count) { try { return ReadAsync(buffer, offset, count).Result; } catch (AggregateException e) { throw e.Unwrap(); } } public override async Task<int> ReadAsync( byte[] buffer, int offset, int count, CancellationToken cancellationToken) { if (!this.CanRead) { throw new NotSupportedException("Stream is not readable"); } cancellationToken.ThrowIfCancellationRequested(); // // Perform a synchronous read on the worker thread. // var bytesRead = await this.Connection .RunAsync( session => { Debug.Assert(this.Connection.IsRunningOnWorkerThread); using (session.Session.AsBlocking()) { if (offset == 0 && count == buffer.Length) { // // Use the supplied buffer. // return (int)this.nativeChannel.Read(buffer); } else { var readBuffer = new byte[count]; var bytesRead = this.nativeChannel.Read(readBuffer); Array.Copy(readBuffer, 0, buffer, offset, count); return (int)bytesRead; } } }, false) .ConfigureAwait(false); this.position += bytesRead; return bytesRead; } //---------------------------------------------------------------------- // Writing. //---------------------------------------------------------------------- public override bool CanWrite { get => this.flags.HasFlag(LIBSSH2_FXF_FLAGS.WRITE); } [SuppressMessage("Usage", "VSTHRD002:Avoid problematic synchronous waits", Justification = "")] public override void Write(byte[] buffer, int offset, int count) { try { WriteAsync(buffer, offset, count).Wait(); } catch (AggregateException e) { throw e.Unwrap(); } } public override async Task WriteAsync( byte[] buffer, int offset, int count, CancellationToken cancellationToken) { if (!this.CanWrite) { throw new NotSupportedException("Stream is not writable"); } cancellationToken.ThrowIfCancellationRequested(); // // Perform a synchronous write on the worker thread. // await this.Connection .RunAsync<object?>( session => { Debug.Assert(this.Connection.IsRunningOnWorkerThread); using (session.Session.AsBlocking()) { if (offset == 0) { // // Use the supplied buffer. // this.nativeChannel.Write(buffer, count); } else { var writeBuffer = new byte[count]; Array.Copy(buffer, offset, writeBuffer, 0, count); this.nativeChannel.Write(writeBuffer, count); } } return session; }, false) .ConfigureAwait(false); this.position += count; } public override void Flush() { } //---------------------------------------------------------------------- // Seeking (not supported). //---------------------------------------------------------------------- public override bool CanSeek { get => false; } public override void SetLength(long value) { throw new NotSupportedException(); } public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } public override long Position { get => this.position; set { if (this.position != value) { throw new NotSupportedException( "The stream does not support seeking"); } } } } }