in AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Sync/SubscriptionSync/ModelReconciliationQueueBehaviorTests.swift [303:411]
func testProcessesNewEvents() throws {
// Return a successful MockSynced save
storageAdapter.responders[.saveUntypedModel] = SaveUntypedModelResponder { model, completion in
completion(.success(model))
}
let event1ShouldBeProcessed = expectation(description: "Event 1 should be processed")
let event2ShouldBeProcessed = expectation(description: "Event 2 should be processed")
storageAdapter.responders[.saveModelCompletion] =
SaveModelCompletionResponder<MutationSyncMetadata> { mutationSyncMetadata, completion in
switch mutationSyncMetadata.modelId {
case "id-1":
event1ShouldBeProcessed.fulfill()
case "id-2":
event2ShouldBeProcessed.fulfill()
default:
break
}
completion(.success(mutationSyncMetadata))
}
let queue = AWSModelReconciliationQueue(modelSchema: MockSynced.schema,
storageAdapter: storageAdapter,
api: apiPlugin,
reconcileAndSaveQueue: reconcileAndSaveQueue,
modelPredicate: modelPredicate,
auth: authPlugin,
authModeStrategy: AWSDefaultAuthModeStrategy(),
incomingSubscriptionEvents: subscriptionEventsPublisher)
for iteration in 1 ... 2 {
let model = try MockSynced(id: "id-\(iteration)").eraseToAnyModel()
let syncMetadata = MutationSyncMetadata(modelId: model.id,
modelName: model.modelName,
deleted: false,
lastChangedAt: Date().unixSeconds,
version: 1)
let mutationSync = MutationSync(model: model, syncMetadata: syncMetadata)
subscriptionEventsSubject.send(.mutationEvent(mutationSync))
}
let eventsSentViaPublisher1 = expectation(description: "id-1 sent via publisher")
let eventsSentViaPublisher2 = expectation(description: "id-2 sent via publisher")
var queueSink = queue.publisher.sink(receiveCompletion: { _ in
XCTFail("Not expecting a call to completion")
}, receiveValue: { event in
if case let .mutationEvent(mutationEvent) = event {
switch mutationEvent.modelId {
case "id-1":
eventsSentViaPublisher1.fulfill()
case "id-2":
eventsSentViaPublisher2.fulfill()
default:
XCTFail("Not expecting a call to default")
}
}
})
queue.start()
wait(for: [event1ShouldBeProcessed,
event2ShouldBeProcessed,
eventsSentViaPublisher1,
eventsSentViaPublisher2], timeout: 1.0)
let event1ShouldNotBeProcessed = expectation(description: "Event 1 should not be processed")
event1ShouldNotBeProcessed.isInverted = true
let event2ShouldNotBeProcessed = expectation(description: "Event 2 should not be processed")
event2ShouldNotBeProcessed.isInverted = true
let event3ShouldBeProcessed = expectation(description: "Event 3 should be processed")
storageAdapter.responders[.saveModelCompletion] =
SaveModelCompletionResponder<MutationSyncMetadata> { mutationSyncMetadata, completion in
switch mutationSyncMetadata.modelId {
case "id-1":
event1ShouldNotBeProcessed.fulfill()
case "id-2":
event2ShouldNotBeProcessed.fulfill()
case "id-3":
event3ShouldBeProcessed.fulfill()
default:
break
}
completion(.success(mutationSyncMetadata))
}
let eventsSentViaPublisher3 = expectation(description: "id-3 sent via publisher")
queueSink = queue.publisher.sink(receiveCompletion: { _ in
XCTFail("Not expecting a call to completion")
}, receiveValue: { event in
if case let .mutationEvent(mutationEvent) = event {
if mutationEvent.modelId == "id-3" {
eventsSentViaPublisher3.fulfill()
}
}
})
let model = try MockSynced(id: "id-3").eraseToAnyModel()
let syncMetadata = MutationSyncMetadata(modelId: model.id,
modelName: model.modelName,
deleted: false,
lastChangedAt: Date().unixSeconds,
version: 1)
let mutationSync = MutationSync(model: model, syncMetadata: syncMetadata)
subscriptionEventsSubject.send(.mutationEvent(mutationSync))
wait(for: [event1ShouldNotBeProcessed,
event2ShouldNotBeProcessed,
event3ShouldBeProcessed,
eventsSentViaPublisher3], timeout: 1.0)
}