func testSubscriptionIsInvokedOnProvidedQueue()

in AWSAppSyncIntegrationTests/SubscriptionTests.swift [154:235]


    func testSubscriptionIsInvokedOnProvidedQueue() throws {
        let label = "testSyncOperationAtSetupAndReconnect.syncWatcherCallbackQueue"
        let syncWatcherCallbackQueue = DispatchQueue(label: label)
        let queueIdentityKey = DispatchSpecificKey<String>()
        let queueIdentityValue = label
        syncWatcherCallbackQueue.setSpecific(key: queueIdentityKey, value: queueIdentityValue)

        let appSyncClient = try SubscriptionTests.makeAppSyncClient(authType: self.authType)

        let postCreated = expectation(description: "Post created successfully.")
        let addPost = DefaultTestPostData.defaultCreatePostWithoutFileUsingParametersMutation
        var idHolder: GraphQLID?
        appSyncClient.perform(mutation: addPost, queue: SubscriptionTests.mutationQueue) { result, error in
            print("CreatePost result handler invoked")
            idHolder = result?.data?.createPostWithoutFileUsingParameters?.id
            postCreated.fulfill()
        }
        wait(for: [postCreated], timeout: SubscriptionTests.networkOperationTimeout)

        guard let id = idHolder else {
            XCTFail("Expected ID from addPost mutation")
            return
        }

        // We use the base query result handler to know that the subscription is active. Delta Sync does not attempt to
        // perform a server query until the subscription is established, to ensure that no data is lost between the time
        // we begin establishing a sync connection and the time we finish the base query
        let baseQueryFetchFromServerComplete = expectation(description: "BaseQuery fetch from server complete")
        let baseQueryResultHandler: (GraphQLResult<ListPostsQuery.Data>?, Error?) -> Void = { result, _ in
            guard let result = result else {
                return
            }
            if result.source == .server {
                baseQueryFetchFromServerComplete.fulfill()
            }
        }

        let deltaQueryResultHandler: (GraphQLResult<ListPostsQuery.Data>?, ApolloStore.ReadWriteTransaction?, Error?) -> Void = { _, _, _ in }

        let subscriptionResultHandlerInvoked = expectation(description: "Subscription result handler invoked")
        let subscriptionResultHandler: (GraphQLResult<OnUpvotePostSubscription.Data>?, ApolloStore.ReadWriteTransaction?, Error?) -> Void = { _, _, _ in
            subscriptionResultHandlerInvoked.fulfill()
            let dispatchQueueValue = DispatchQueue.getSpecific(key: queueIdentityKey)
            XCTAssertEqual(dispatchQueueValue, queueIdentityValue, "Expected callback to be invoked on provided queue")
        }

        let listPostsQuery = ListPostsQuery()
        let subscription = OnUpvotePostSubscription(id: id)

        let syncWatcher = appSyncClient.sync(
            baseQuery: listPostsQuery,
            baseQueryResultHandler: baseQueryResultHandler,
            subscription: subscription,
            subscriptionResultHandler: subscriptionResultHandler,
            deltaQuery: listPostsQuery,
            deltaQueryResultHandler: deltaQueryResultHandler,
            callbackQueue: syncWatcherCallbackQueue,
            syncConfiguration: SyncConfiguration()
        )

        defer {
            syncWatcher.cancel()
        }

        wait(for: [baseQueryFetchFromServerComplete], timeout: SubscriptionTests.networkOperationTimeout)

        let upvote = UpvotePostMutation(id: id)
        let upvoteComplete = expectation(description: "Upvote mutation completed")

        self.appSyncClient?.perform(mutation: upvote,
                                    queue: SubscriptionTests.mutationQueue) { _, _ in
                                        upvoteComplete.fulfill()
        }

        wait(
            for: [
                upvoteComplete,
                subscriptionResultHandlerInvoked,
                ],
            timeout: SubscriptionTests.networkOperationTimeout
        )
    }