Sample/SwiftUI-MVVM/TicTacToe/Sources/ReplaySubject.swift (67 lines of code) (raw):

// // Copyright (c) 2018. Uber Technologies // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // import Combine import Foundation final class ReplaySubject<Output, Failure: Error>: Subject { private var buffer = [Output]() private let bufferSize: Int private let lock = NSRecursiveLock() private var subscriptions = [ReplaySubjectSubscription<Output, Failure>]() private var completion: Subscribers.Completion<Failure>? init(_ bufferSize: Int = 0) { self.bufferSize = bufferSize } } extension ReplaySubject { func receive<Downstream: Subscriber>(subscriber: Downstream) where Downstream.Failure == Failure, Downstream.Input == Output { lock.lock(); defer { lock.unlock() } let subscription = ReplaySubjectSubscription<Output, Failure>(downstream: AnySubscriber(subscriber)) subscriber.receive(subscription: subscription) subscriptions.append(subscription) subscription.replay(buffer, completion: completion) } } extension ReplaySubject { /// Establishes demand for a new upstream subscriptions func send(subscription: Subscription) { lock.lock(); defer { lock.unlock() } subscription.request(.unlimited) } /// Sends a value to the subscriber. func send(_ value: Output) { lock.lock(); defer { lock.unlock() } buffer.append(value) buffer = buffer.suffix(bufferSize) subscriptions.forEach { $0.receive(value) } } /// Sends a completion event to the subscriber. func send(completion: Subscribers.Completion<Failure>) { lock.lock(); defer { lock.unlock() } self.completion = completion subscriptions.forEach { subscription in subscription.receive(completion: completion) } } } final class ReplaySubjectSubscription<Output, Failure: Error>: Subscription { private let downstream: AnySubscriber<Output, Failure> private var isCompleted = false private var demand: Subscribers.Demand = .none init(downstream: AnySubscriber<Output, Failure>) { self.downstream = downstream } func request(_ newDemand: Subscribers.Demand) { demand += newDemand } func cancel() { isCompleted = true } func receive(_ value: Output) { guard !isCompleted, demand > 0 else { return } demand += downstream.receive(value) demand -= 1 } func receive(completion: Subscribers.Completion<Failure>) { guard !isCompleted else { return } isCompleted = true downstream.receive(completion: completion) } func replay(_ values: [Output], completion: Subscribers.Completion<Failure>?) { guard !isCompleted else { return } values.forEach { value in receive(value) } if let completion = completion { receive(completion: completion) } } }