func testLastMutationSentWhenNoNetwork()

in AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Sync/MutationQueue/OutgoingMutationQueueNetworkTests.swift [83:257]


    func testLastMutationSentWhenNoNetwork() throws {
        // NOTE: The state descriptions in this test are approximate, especially as regards the
        // values of the MutationEvent table. Processing happens asynchronously, so it is important
        // to only assert the behavior we care about (which is that the final update happens after
        // network is restored).

        var post = Post(title: "Test", content: "Newly created", createdAt: .now())
        let expectedFinalContent = "FINAL UPDATE"
        let version = AtomicValue(initialValue: 0)

        let networkUnavailable = expectation(description: "networkUnavailable")
        let networkAvailableAgain = expectation(description: "networkAvailableAgain")

        setUpNetworkStatusListener(
            fulfillingWhenNetworkUnavailable: networkUnavailable,
            fulfillingWhenNetworkAvailableAgain: networkAvailableAgain
        )

        // Set up API responder chain

        // The first response is a success for the initial "Create" mutation
        let apiRespondedWithSuccess = expectation(description: "apiRespondedWithSuccess")
        let acceptInitialMutation = setUpInitialMutationRequestResponder(
            for: try post.eraseToAnyModel(),
            fulfilling: apiRespondedWithSuccess,
            incrementing: version
        )

        // This response rejects mutations with a retriable error. This will cause the
        // SyncMutationToCloudOperation to schedule a retry for some future date (a few dozen
        // milliseconds in the future, for the first one). By that time, we will have enqueued new
        // mutations, at which point we can resume internet connectivity and ensure the API gets
        // called with the latest mutation. The delay simulates network time--this is to allow us
        // to add mutations to the pending queue while there is one in process.
        let rejectMutationsWithRetriableError = setUpRetriableErrorRequestResponder(
            listenerDelay: 0.25
        )

        // Once we've rejected some mutations due to an unreachable network, we'll allow the final
        // mutation to succeed. This is where we will assert that we've seen the last mutation
        // to be processed
        let expectedFinalContentReceived = expectation(description: "expectedFinalContentReceived")
        let acceptSubsequentMutations = setUpSubsequentMutationRequestResponder(
            for: try post.eraseToAnyModel(),
            fulfilling: expectedFinalContentReceived,
            whenContentContains: expectedFinalContent,
            incrementing: version
        )

        // Start by accepting the initial "create" mutation
        apiPlugin.responders = [.mutateRequestListener: acceptInitialMutation]

        try startAmplifyAndWaitForSync()

        // Save initial model
        let createdNewItem = expectation(description: "createdNewItem")
        Amplify.DataStore.save(post) {
            if case .failure(let error) = $0 {
                XCTAssertNil(error)
            } else {
                createdNewItem.fulfill()
            }
        }

        wait(for: [createdNewItem, apiRespondedWithSuccess], timeout: 0.1)

        // Set the responder to reject the mutation. Make sure to push a retry advice before sending
        // a new mutation.
        apiPlugin.responders = [.mutateRequestListener: rejectMutationsWithRetriableError]

        // NOTE: This policy is not used by the SyncMutationToCloudOperation, only by the
        // RemoteSyncEngine.
        requestRetryablePolicy
            .pushOnRetryRequestAdvice(
                response: RequestRetryAdvice(
                    shouldRetry: true,
                    retryInterval: .seconds(100)
                )
            )

        // We expect this to be picked up by the OutgoingMutationQueue since the network is still
        // available. However, the mutation will be rejected with a retriable error. That retry
        // will be scheduled and probably in "waiting" mode when we send the network unavailable
        // notification below.
        post.content = "Update 1"
        let savedUpdate1 = expectation(description: "savedUpdate1")
        Amplify.DataStore.save(post) {
            if case .failure(let error) = $0 {
                XCTAssertNil(error)
            } else {
                savedUpdate1.fulfill()
            }
        }
        wait(for: [savedUpdate1], timeout: 0.1)

        // At this point, the MutationEvent table (the backing store for the outgoing mutation
        // queue) has only a record for the interim update. It is marked as `inProcess: true`,
        // because the mutation queue is operational and will have picked up the item and attempted
        // to sync it to the cloud.

        // "Turn off" network. The `mockSendCompletion` notifies each subscription listener of a
        // connection error, which will cause the state machine to clean up. As part of cleanup,
        // the RemoteSyncEngine will stop the outgoing mutation queue. We will set the retry
        // advice interval to be very high, so it will be preempted by the "network available"
        // message we send later.

        reachabilitySubject.send(ReachabilityUpdate(isOnline: false))
        let noNetworkCompletion = Subscribers
            .Completion<DataStoreError>
            .failure(.sync("Test", "test", connectionError))
        MockAWSIncomingEventReconciliationQueue.mockSendCompletion(completion: noNetworkCompletion)

        // Assert that DataStore has pushed the no-network event. This isn't strictly necessary for
        // correct operation of the queue.
        wait(for: [networkUnavailable], timeout: 0.1)

        // At this point, the MutationEvent table has only a record for update1. It is marked as
        // `inProcess: false`, because the mutation queue has been fully cancelled by the cleanup
        // process.

        // Submit two more mutations. The second mutation will overwrite the "initial updated
        // content" record with new "interim" content. Neither of those will be processed by the
        // outgoing mutation queue, since the network is not available and the OutgoingMutationQueue
        // was stopped during cleanup above.

        // We expect this to be written to the queue, overwriting the existing initial update. We
        // also expect that it will be overwritten by the next mutation, without ever being synced
        // to the service.
        post.content = "Update 2"
        let savedUpdate2 = expectation(description: "savedUpdate2")
        Amplify.DataStore.save(post) {
            if case .failure(let error) = $0 {
                XCTAssertNil(error)
            } else {
                savedUpdate2.fulfill()
            }
        }
        wait(for: [savedUpdate2], timeout: 0.1)

        // At this point, the MutationEvent table has only a record for update2. It is marked as
        // `inProcess: false`, because the mutation queue has been fully cancelled.

        // Write another mutation. The current disposition behavior is that the system detects
        // a not-in-process mutation in the queue, and overwrites it with this data. The
        // reconciliation logic drops all but the oldest not-in-process mutations, which means that
        // even if there were multiple not-in-process mutations, after the reconciliation completes
        // there would only be one record in the MutationEvent table.
        post.content = expectedFinalContent
        let savedFinalUpdate = expectation(description: "savedFinalUpdate")
        Amplify.DataStore.save(post) {
            if case .failure(let error) = $0 {
                XCTAssertNil(error)
            } else {
                savedFinalUpdate.fulfill()
            }
        }
        wait(for: [savedFinalUpdate], timeout: 0.1)

        let syncStarted = expectation(description: "syncStarted")
        setUpSyncStartedListener(
            fulfillingWhenSyncStarted: syncStarted
        )

        let outboxEmpty = expectation(description: "outboxEmpty")
        setUpOutboxEmptyListener(
            fulfillingWhenOutboxEmpty: outboxEmpty
        )

        // Turn on network. This will preempt the retry timer and immediately start processing
        // the queue. We expect the mutation queue to restart, poll the MutationEvent table, pick
        // up the final update, and process it.
        apiPlugin.responders = [.mutateRequestListener: acceptSubsequentMutations]
        reachabilitySubject.send(ReachabilityUpdate(isOnline: true))
        wait(for: [networkAvailableAgain, syncStarted, outboxEmpty, expectedFinalContentReceived], timeout: 5.0)
    }