sources/Google.Solutions.IapDesktop.Core/ObjectModel/EventQueue.cs (197 lines of code) (raw):

// // Copyright 2020 Google LLC // // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // using Google.Solutions.Common.Diagnostics; using Google.Solutions.Common.Threading; using Google.Solutions.Common.Util; using System; using System.Collections.Generic; using System.ComponentModel; using System.Diagnostics; using System.Linq; using System.Reflection; using System.Threading; using System.Threading.Tasks; namespace Google.Solutions.IapDesktop.Core.ObjectModel { public class EventQueue : IEventQueue { private readonly ISynchronizeInvoke invoker; private readonly object subscriptionsLock; private readonly IDictionary<Type, List<ISubscription>> subscriptionsByEvent; /// <summary> /// Create an event queue. /// </summary> /// <param name="invoker">Invoker to use for publishing events</param> public EventQueue(ISynchronizeInvoke invoker) { this.invoker = invoker.ExpectNotNull(nameof(invoker)); this.subscriptionsLock = new object(); this.subscriptionsByEvent = new Dictionary<Type, List<ISubscription>>(); } private bool Unsubscribe<TEvent>(ISubscription subscription) { lock (this.subscriptionsLock) { if (this.subscriptionsByEvent.TryGetValue(typeof(TEvent), out var subscribers)) { return subscribers.Remove(subscription); } return false; } } internal IEnumerable<Subscription<TEvent>> GetSubscriptions<TEvent>() { lock (this.subscriptionsLock) { if (this.subscriptionsByEvent.TryGetValue(typeof(TEvent), out var subscriptions)) { // // Create a snapshot that remains valid when // we leave the lock. // return new List<Subscription<TEvent>>( subscriptions.OfType<Subscription<TEvent>>()); } else { return Enumerable.Empty<Subscription<TEvent>>(); } } } //--------------------------------------------------------------------- // IEventQueue. //--------------------------------------------------------------------- public ISubscription Subscribe<TEvent>( Func<TEvent, Task> handler, SubscriptionOptions lifecycle = SubscriptionOptions.None) { lock (this.subscriptionsLock) { if (!this.subscriptionsByEvent.TryGetValue(typeof(TEvent), out var subscriptions)) { subscriptions = new List<ISubscription>(); this.subscriptionsByEvent.Add(typeof(TEvent), subscriptions); } Subscription<TEvent> subsciption; if (lifecycle == SubscriptionOptions.WeakSubscriberReference) { subsciption = new WeakSubscription<TEvent>(this, handler); } else { subsciption = new StrongSubscription<TEvent>(this, handler); } subscriptions.Add(subsciption); return subsciption; } } public ISubscription Subscribe<TEvent>( Action<TEvent> handler, SubscriptionOptions lifecycle = SubscriptionOptions.None) { return Subscribe<TEvent>(e => { handler(e); return Task.CompletedTask; }, lifecycle); } public ISubscription Subscribe<TEvent>( IAsyncSubscriber<TEvent> subscriber, SubscriptionOptions lifecycle = SubscriptionOptions.None) { return Subscribe<TEvent>(subscriber.NotifyAsync, lifecycle); } public ISubscription Subscribe<TEvent>( ISubscriber<TEvent> subscriber, SubscriptionOptions lifecycle = SubscriptionOptions.None) { return Subscribe<TEvent>(subscriber.Notify, lifecycle); } public Task PublishAsync<TEvent>(TEvent eventObject) { return this.invoker.InvokeAsync(async () => { // // We're on the right thread now. Grab a snapshot of relevant // subcriptions and invoke them. // // NB. It's possible that while we're doing that, a subscription // is disposed. Bu the Subscription.Invoke implementation takes // care of that. // foreach (var subscriber in GetSubscriptions<TEvent>()) { await subscriber .InvokeAsync(eventObject) .ConfigureAwait(true); // Stay on thread! } }); } public void Publish<TEvent>(TEvent eventObject) { _ = PublishAsync(eventObject) .ContinueWith( t => { Debug.Assert( Assembly.GetEntryAssembly() == null, // Don't assert in unit tests "One or more subscribers failed to handle an event: " + t.Exception); CoreTraceSource.Log.TraceError(t.Exception); }, CancellationToken.None, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default); } //--------------------------------------------------------------------- // Inner classes. //--------------------------------------------------------------------- /// <summary> /// Base class for subscriptions. /// </summary> internal abstract class Subscription<TEvent> : ISubscription { private bool disposed = false; private readonly EventQueue queue; protected Subscription(EventQueue queue) { this.queue = queue.ExpectNotNull(nameof(queue)); } protected abstract Task InvokeCoreAsync(TEvent e); public Task InvokeAsync(TEvent e) { if (this.disposed) { // // Subscription is stale, ignore. // return Task.CompletedTask; } return InvokeCoreAsync(e); } public void Dispose() { if (!this.disposed) { var removed = this.queue.Unsubscribe<TEvent>(this); Debug.Assert(removed); this.disposed = true; } } } /// <summary> /// Normal subscription that uses a strong reference to the subscriber. /// </summary> internal sealed class StrongSubscription<TEvent> : Subscription<TEvent> { private readonly Func<TEvent, Task> callback; public StrongSubscription( EventQueue queue, Func<TEvent, Task> callback) : base(queue) { this.callback = callback.ExpectNotNull(nameof(callback)); } protected override Task InvokeCoreAsync(TEvent e) { return this.callback.Invoke(e); } } /// <summary> /// Weak subscription that avoids keeping the subscriber alive. /// </summary> internal sealed class WeakSubscription<TEvent> : Subscription<TEvent> { private readonly WeakReference<Func<TEvent, Task>> callback; public WeakSubscription( EventQueue queue, Func<TEvent, Task> callback) : base(queue) { this.callback = new WeakReference<Func<TEvent, Task>>( callback.ExpectNotNull(nameof(callback))); } protected override Task InvokeCoreAsync(TEvent e) { if (this.callback.TryGetTarget(out var target)) { return target.Invoke(e); } else { // // The subscriber is gone, remove this subscription // so that the list of subscriber doesn't grow unbounded. // Dispose(); return Task.CompletedTask; } } /// <summary> /// For testing only: Simulate that the subscriber was GC'ed. /// </summary> internal void SimulateSubscriberWasGarbageCollected() { this.callback.SetTarget(null!); } } } }