sources/Google.Solutions.Iap/Net/SingleReaderSingleWriterStream.cs (78 lines of code) (raw):

// // Copyright 2019 Google LLC // // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // using System.Threading; using System.Threading.Tasks; namespace Google.Solutions.Iap.Net { /// <summary> /// Base class for a stream that only allows one reader and one writer /// at a time. /// </summary> public abstract class SingleReaderSingleWriterStream : OneTimeUseStream { private readonly SemaphoreSlim readerSemaphore = new SemaphoreSlim(1); private readonly SemaphoreSlim writerSemaphore = new SemaphoreSlim(1); //--------------------------------------------------------------------- // Methods to be overridden //--------------------------------------------------------------------- protected abstract Task<int> ProtectedReadAsync( byte[] buffer, int offset, int count, CancellationToken cancellationToken); protected abstract Task ProtectedWriteAsync( byte[] buffer, int offset, int count, CancellationToken cancellationToken); public abstract Task ProtectedCloseAsync(CancellationToken cancellationToken); //--------------------------------------------------------------------- // Publics //--------------------------------------------------------------------- protected override async Task<int> ReadAsyncWithCloseProtection( byte[] buffer, int offset, int count, CancellationToken cancellationToken) { try { // // Acquire semaphore to ensure that only a single // read operation is in flight at a time. // await this.readerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); return await ProtectedReadAsync( buffer, offset, count, cancellationToken).ConfigureAwait(false); } finally { this.readerSemaphore.Release(); } } protected override async Task WriteAsyncWithCloseProtection(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { try { // // Acquire semaphore to ensure that only a single // write/close operation is in flight at a time. // await this.writerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); await ProtectedWriteAsync( buffer, offset, count, cancellationToken).ConfigureAwait(false); } finally { this.writerSemaphore.Release(); } } protected override async Task CloseAsyncWithCloseProtection(CancellationToken cancellationToken) { try { // // Acquire semaphore to ensure that only a single // write/close operation is in flight at a time. // await this.writerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); await ProtectedCloseAsync(cancellationToken).ConfigureAwait(false); } finally { this.writerSemaphore.Release(); } } protected override void Dispose(bool disposing) { if (disposing) { this.readerSemaphore.Dispose(); this.writerSemaphore.Dispose(); } } } }