in AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Sync/MutationQueue/OutgoingMutationQueueNetworkTests.swift [83:257]
func testLastMutationSentWhenNoNetwork() throws {
// NOTE: The state descriptions in this test are approximate, especially as regards the
// values of the MutationEvent table. Processing happens asynchronously, so it is important
// to only assert the behavior we care about (which is that the final update happens after
// network is restored).
var post = Post(title: "Test", content: "Newly created", createdAt: .now())
let expectedFinalContent = "FINAL UPDATE"
let version = AtomicValue(initialValue: 0)
let networkUnavailable = expectation(description: "networkUnavailable")
let networkAvailableAgain = expectation(description: "networkAvailableAgain")
setUpNetworkStatusListener(
fulfillingWhenNetworkUnavailable: networkUnavailable,
fulfillingWhenNetworkAvailableAgain: networkAvailableAgain
)
// Set up API responder chain
// The first response is a success for the initial "Create" mutation
let apiRespondedWithSuccess = expectation(description: "apiRespondedWithSuccess")
let acceptInitialMutation = setUpInitialMutationRequestResponder(
for: try post.eraseToAnyModel(),
fulfilling: apiRespondedWithSuccess,
incrementing: version
)
// This response rejects mutations with a retriable error. This will cause the
// SyncMutationToCloudOperation to schedule a retry for some future date (a few dozen
// milliseconds in the future, for the first one). By that time, we will have enqueued new
// mutations, at which point we can resume internet connectivity and ensure the API gets
// called with the latest mutation. The delay simulates network time--this is to allow us
// to add mutations to the pending queue while there is one in process.
let rejectMutationsWithRetriableError = setUpRetriableErrorRequestResponder(
listenerDelay: 0.25
)
// Once we've rejected some mutations due to an unreachable network, we'll allow the final
// mutation to succeed. This is where we will assert that we've seen the last mutation
// to be processed
let expectedFinalContentReceived = expectation(description: "expectedFinalContentReceived")
let acceptSubsequentMutations = setUpSubsequentMutationRequestResponder(
for: try post.eraseToAnyModel(),
fulfilling: expectedFinalContentReceived,
whenContentContains: expectedFinalContent,
incrementing: version
)
// Start by accepting the initial "create" mutation
apiPlugin.responders = [.mutateRequestListener: acceptInitialMutation]
try startAmplifyAndWaitForSync()
// Save initial model
let createdNewItem = expectation(description: "createdNewItem")
Amplify.DataStore.save(post) {
if case .failure(let error) = $0 {
XCTAssertNil(error)
} else {
createdNewItem.fulfill()
}
}
wait(for: [createdNewItem, apiRespondedWithSuccess], timeout: 0.1)
// Set the responder to reject the mutation. Make sure to push a retry advice before sending
// a new mutation.
apiPlugin.responders = [.mutateRequestListener: rejectMutationsWithRetriableError]
// NOTE: This policy is not used by the SyncMutationToCloudOperation, only by the
// RemoteSyncEngine.
requestRetryablePolicy
.pushOnRetryRequestAdvice(
response: RequestRetryAdvice(
shouldRetry: true,
retryInterval: .seconds(100)
)
)
// We expect this to be picked up by the OutgoingMutationQueue since the network is still
// available. However, the mutation will be rejected with a retriable error. That retry
// will be scheduled and probably in "waiting" mode when we send the network unavailable
// notification below.
post.content = "Update 1"
let savedUpdate1 = expectation(description: "savedUpdate1")
Amplify.DataStore.save(post) {
if case .failure(let error) = $0 {
XCTAssertNil(error)
} else {
savedUpdate1.fulfill()
}
}
wait(for: [savedUpdate1], timeout: 0.1)
// At this point, the MutationEvent table (the backing store for the outgoing mutation
// queue) has only a record for the interim update. It is marked as `inProcess: true`,
// because the mutation queue is operational and will have picked up the item and attempted
// to sync it to the cloud.
// "Turn off" network. The `mockSendCompletion` notifies each subscription listener of a
// connection error, which will cause the state machine to clean up. As part of cleanup,
// the RemoteSyncEngine will stop the outgoing mutation queue. We will set the retry
// advice interval to be very high, so it will be preempted by the "network available"
// message we send later.
reachabilitySubject.send(ReachabilityUpdate(isOnline: false))
let noNetworkCompletion = Subscribers
.Completion<DataStoreError>
.failure(.sync("Test", "test", connectionError))
MockAWSIncomingEventReconciliationQueue.mockSendCompletion(completion: noNetworkCompletion)
// Assert that DataStore has pushed the no-network event. This isn't strictly necessary for
// correct operation of the queue.
wait(for: [networkUnavailable], timeout: 0.1)
// At this point, the MutationEvent table has only a record for update1. It is marked as
// `inProcess: false`, because the mutation queue has been fully cancelled by the cleanup
// process.
// Submit two more mutations. The second mutation will overwrite the "initial updated
// content" record with new "interim" content. Neither of those will be processed by the
// outgoing mutation queue, since the network is not available and the OutgoingMutationQueue
// was stopped during cleanup above.
// We expect this to be written to the queue, overwriting the existing initial update. We
// also expect that it will be overwritten by the next mutation, without ever being synced
// to the service.
post.content = "Update 2"
let savedUpdate2 = expectation(description: "savedUpdate2")
Amplify.DataStore.save(post) {
if case .failure(let error) = $0 {
XCTAssertNil(error)
} else {
savedUpdate2.fulfill()
}
}
wait(for: [savedUpdate2], timeout: 0.1)
// At this point, the MutationEvent table has only a record for update2. It is marked as
// `inProcess: false`, because the mutation queue has been fully cancelled.
// Write another mutation. The current disposition behavior is that the system detects
// a not-in-process mutation in the queue, and overwrites it with this data. The
// reconciliation logic drops all but the oldest not-in-process mutations, which means that
// even if there were multiple not-in-process mutations, after the reconciliation completes
// there would only be one record in the MutationEvent table.
post.content = expectedFinalContent
let savedFinalUpdate = expectation(description: "savedFinalUpdate")
Amplify.DataStore.save(post) {
if case .failure(let error) = $0 {
XCTAssertNil(error)
} else {
savedFinalUpdate.fulfill()
}
}
wait(for: [savedFinalUpdate], timeout: 0.1)
let syncStarted = expectation(description: "syncStarted")
setUpSyncStartedListener(
fulfillingWhenSyncStarted: syncStarted
)
let outboxEmpty = expectation(description: "outboxEmpty")
setUpOutboxEmptyListener(
fulfillingWhenOutboxEmpty: outboxEmpty
)
// Turn on network. This will preempt the retry timer and immediately start processing
// the queue. We expect the mutation queue to restart, poll the MutationEvent table, pick
// up the final update, and process it.
apiPlugin.responders = [.mutateRequestListener: acceptSubsequentMutations]
reachabilitySubject.send(ReachabilityUpdate(isOnline: true))
wait(for: [networkAvailableAgain, syncStarted, outboxEmpty, expectedFinalContentReceived], timeout: 5.0)
}