func testProcessesNewEvents()

in AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Sync/SubscriptionSync/ModelReconciliationQueueBehaviorTests.swift [303:411]


    func testProcessesNewEvents() throws {
        // Return a successful MockSynced save
        storageAdapter.responders[.saveUntypedModel] = SaveUntypedModelResponder { model, completion in
            completion(.success(model))
        }

        let event1ShouldBeProcessed = expectation(description: "Event 1 should be processed")
        let event2ShouldBeProcessed = expectation(description: "Event 2 should be processed")
        storageAdapter.responders[.saveModelCompletion] =
            SaveModelCompletionResponder<MutationSyncMetadata> { mutationSyncMetadata, completion in
                switch mutationSyncMetadata.modelId {
                case "id-1":
                    event1ShouldBeProcessed.fulfill()
                case "id-2":
                    event2ShouldBeProcessed.fulfill()
                default:
                    break
                }
                completion(.success(mutationSyncMetadata))
        }

        let queue = AWSModelReconciliationQueue(modelSchema: MockSynced.schema,
                                                storageAdapter: storageAdapter,
                                                api: apiPlugin,
                                                reconcileAndSaveQueue: reconcileAndSaveQueue,
                                                modelPredicate: modelPredicate,
                                                auth: authPlugin,
                                                authModeStrategy: AWSDefaultAuthModeStrategy(),
                                                incomingSubscriptionEvents: subscriptionEventsPublisher)
        for iteration in 1 ... 2 {
            let model = try MockSynced(id: "id-\(iteration)").eraseToAnyModel()
            let syncMetadata = MutationSyncMetadata(modelId: model.id,
                                                    modelName: model.modelName,
                                                    deleted: false,
                                                    lastChangedAt: Date().unixSeconds,
                                                    version: 1)
            let mutationSync = MutationSync(model: model, syncMetadata: syncMetadata)
            subscriptionEventsSubject.send(.mutationEvent(mutationSync))
        }

        let eventsSentViaPublisher1 = expectation(description: "id-1 sent via publisher")
        let eventsSentViaPublisher2 = expectation(description: "id-2 sent via publisher")
        var queueSink = queue.publisher.sink(receiveCompletion: { _ in
            XCTFail("Not expecting a call to completion")
        }, receiveValue: { event in
            if case let .mutationEvent(mutationEvent) = event {
                switch mutationEvent.modelId {
                case "id-1":
                    eventsSentViaPublisher1.fulfill()
                case "id-2":
                    eventsSentViaPublisher2.fulfill()
                default:
                    XCTFail("Not expecting a call to default")
                }
            }
        })

        queue.start()

        wait(for: [event1ShouldBeProcessed,
                   event2ShouldBeProcessed,
                   eventsSentViaPublisher1,
                   eventsSentViaPublisher2], timeout: 1.0)

        let event1ShouldNotBeProcessed = expectation(description: "Event 1 should not be processed")
        event1ShouldNotBeProcessed.isInverted = true
        let event2ShouldNotBeProcessed = expectation(description: "Event 2 should not be processed")
        event2ShouldNotBeProcessed.isInverted = true
        let event3ShouldBeProcessed = expectation(description: "Event 3 should be processed")
        storageAdapter.responders[.saveModelCompletion] =
            SaveModelCompletionResponder<MutationSyncMetadata> { mutationSyncMetadata, completion in
                switch mutationSyncMetadata.modelId {
                case "id-1":
                    event1ShouldNotBeProcessed.fulfill()
                case "id-2":
                    event2ShouldNotBeProcessed.fulfill()
                case "id-3":
                    event3ShouldBeProcessed.fulfill()
                default:
                    break
                }
                completion(.success(mutationSyncMetadata))
        }

        let eventsSentViaPublisher3 = expectation(description: "id-3 sent via publisher")
        queueSink = queue.publisher.sink(receiveCompletion: { _ in
            XCTFail("Not expecting a call to completion")
        }, receiveValue: { event in
            if case let .mutationEvent(mutationEvent) = event {
                if mutationEvent.modelId == "id-3" {
                    eventsSentViaPublisher3.fulfill()
                }
            }
        })

        let model = try MockSynced(id: "id-3").eraseToAnyModel()
        let syncMetadata = MutationSyncMetadata(modelId: model.id,
                                                modelName: model.modelName,
                                                deleted: false,
                                                lastChangedAt: Date().unixSeconds,
                                                version: 1)
        let mutationSync = MutationSync(model: model, syncMetadata: syncMetadata)
        subscriptionEventsSubject.send(.mutationEvent(mutationSync))

        wait(for: [event1ShouldNotBeProcessed,
                   event2ShouldNotBeProcessed,
                   event3ShouldBeProcessed,
                   eventsSentViaPublisher3], timeout: 1.0)
    }