in src/Microsoft.VisualStudio.Threading/ReentrantSemaphore.cs [663:774]
public override async Task ExecuteAsync(Func<Task> operation, CancellationToken cancellationToken = default)
{
Requires.NotNull(operation, nameof(operation));
this.ThrowIfFaulted();
// No race condition here: We're accessing AsyncLocal<T> which we by definition have our own copy of.
// Multiple threads or multiple async methods will all have their own storage for this field.
Stack<StrongBox<AsyncSemaphore.Releaser>>? reentrantStack = this.reentrantCount.Value;
if (reentrantStack is null || reentrantStack.Count == 0)
{
// When the stack is empty, the semaphore isn't held. But many execution contexts that forked from a common root
// would be sharing this same empty Stack<T> instance. If we pushed to that Stack, all those forks would suddenly
// be seen as having entered this new top-level semaphore. We therefore allocate a new Stack and assign it to our
// AsyncLocal<T> field so that only this particular ExecutionContext is seen as having entered the semaphore.
this.reentrantCount.Value = reentrantStack = new Stack<StrongBox<AsyncSemaphore.Releaser>>(capacity: 2);
}
// Note: this code is duplicated and not extracted to minimize allocating extra async state machines.
// For performance reasons in the JTF enabled scenario, we want to minimize the number of Joins performed, and also
// keep the size of the JoinableCollection to a minimum. This also means awaiting on the semaphore outside of a
// JTF.RunAsync. This requires us to not ConfigureAwait(true) on the semaphore. However, that prevents us from
// resuming on the correct sync context. To partially fix this, we will at least resume you on the main thread or
// thread pool.
AsyncSemaphore.Releaser releaser = default;
bool pushed = false;
StrongBox<AsyncSemaphore.Releaser>? pushedReleaser = null;
try
{
bool resumeOnMainThread = this.IsJoinableTaskAware(out _, out JoinableTaskCollection? joinableTaskCollection)
? joinableTaskCollection.Context.IsOnMainThread
: false;
bool mustYield = false;
if (reentrantStack.Count == 0)
{
using (this.joinableTaskCollection?.Join())
{
if (this.IsJoinableTaskAware(out _, out _))
{
// Use ConfiguredAwaitRunInline() as ConfigureAwait(true) will
// deadlock due to not being inside a JTF.RunAsync().
Task<AsyncSemaphore.Releaser>? releaserTask = this.semaphore.EnterAsync(cancellationToken);
// Yield to prevent running on the stack that released the semaphore.
mustYield = !releaserTask.IsCompleted;
releaser = await releaserTask.ConfigureAwaitRunInline();
}
else
{
releaser = await this.semaphore.EnterAsync(cancellationToken).ConfigureAwait(true);
}
}
}
else
{
releaser = default;
}
pushedReleaser = new StrongBox<AsyncSemaphore.Releaser>(releaser);
await this.ExecuteCoreAsync(async delegate
{
if (this.IsJoinableTaskAware(out JoinableTaskFactory? joinableTaskFactory, out _))
{
if (resumeOnMainThread)
{
// Return to the main thread if we started there.
await joinableTaskFactory.SwitchToMainThreadAsync(alwaysYield: mustYield, cancellationToken);
}
else
{
await TaskScheduler.Default.SwitchTo(alwaysYield: mustYield);
}
}
// The semaphore faulted while we were waiting on it.
this.ThrowIfFaulted();
lock (reentrantStack)
{
reentrantStack.Push(pushedReleaser);
pushed = true;
}
await operation().ConfigureAwaitRunInline();
});
}
finally
{
try
{
if (pushed)
{
lock (reentrantStack)
{
StrongBox<AsyncSemaphore.Releaser>? poppedReleaser = reentrantStack.Pop();
if (!object.ReferenceEquals(poppedReleaser, pushedReleaser))
{
// When the semaphore faults, we will drain and throw for awaiting tasks one by one.
this.faulted = true;
#pragma warning disable CA2219 // Do not raise exceptions in finally clauses
throw new IllegalSemaphoreUsageException(string.Format(CultureInfo.CurrentCulture, Strings.SemaphoreStackNestingViolated, ReentrantSemaphore.ReentrancyMode.Stack));
#pragma warning restore CA2219 // Do not raise exceptions in finally clauses
}
}
}
}
finally
{
DisposeReleaserNoThrow(releaser);
}
}
}