in AWSAppSyncIntegrationTests/SubscriptionTests.swift [154:235]
func testSubscriptionIsInvokedOnProvidedQueue() throws {
let label = "testSyncOperationAtSetupAndReconnect.syncWatcherCallbackQueue"
let syncWatcherCallbackQueue = DispatchQueue(label: label)
let queueIdentityKey = DispatchSpecificKey<String>()
let queueIdentityValue = label
syncWatcherCallbackQueue.setSpecific(key: queueIdentityKey, value: queueIdentityValue)
let appSyncClient = try SubscriptionTests.makeAppSyncClient(authType: self.authType)
let postCreated = expectation(description: "Post created successfully.")
let addPost = DefaultTestPostData.defaultCreatePostWithoutFileUsingParametersMutation
var idHolder: GraphQLID?
appSyncClient.perform(mutation: addPost, queue: SubscriptionTests.mutationQueue) { result, error in
print("CreatePost result handler invoked")
idHolder = result?.data?.createPostWithoutFileUsingParameters?.id
postCreated.fulfill()
}
wait(for: [postCreated], timeout: SubscriptionTests.networkOperationTimeout)
guard let id = idHolder else {
XCTFail("Expected ID from addPost mutation")
return
}
// We use the base query result handler to know that the subscription is active. Delta Sync does not attempt to
// perform a server query until the subscription is established, to ensure that no data is lost between the time
// we begin establishing a sync connection and the time we finish the base query
let baseQueryFetchFromServerComplete = expectation(description: "BaseQuery fetch from server complete")
let baseQueryResultHandler: (GraphQLResult<ListPostsQuery.Data>?, Error?) -> Void = { result, _ in
guard let result = result else {
return
}
if result.source == .server {
baseQueryFetchFromServerComplete.fulfill()
}
}
let deltaQueryResultHandler: (GraphQLResult<ListPostsQuery.Data>?, ApolloStore.ReadWriteTransaction?, Error?) -> Void = { _, _, _ in }
let subscriptionResultHandlerInvoked = expectation(description: "Subscription result handler invoked")
let subscriptionResultHandler: (GraphQLResult<OnUpvotePostSubscription.Data>?, ApolloStore.ReadWriteTransaction?, Error?) -> Void = { _, _, _ in
subscriptionResultHandlerInvoked.fulfill()
let dispatchQueueValue = DispatchQueue.getSpecific(key: queueIdentityKey)
XCTAssertEqual(dispatchQueueValue, queueIdentityValue, "Expected callback to be invoked on provided queue")
}
let listPostsQuery = ListPostsQuery()
let subscription = OnUpvotePostSubscription(id: id)
let syncWatcher = appSyncClient.sync(
baseQuery: listPostsQuery,
baseQueryResultHandler: baseQueryResultHandler,
subscription: subscription,
subscriptionResultHandler: subscriptionResultHandler,
deltaQuery: listPostsQuery,
deltaQueryResultHandler: deltaQueryResultHandler,
callbackQueue: syncWatcherCallbackQueue,
syncConfiguration: SyncConfiguration()
)
defer {
syncWatcher.cancel()
}
wait(for: [baseQueryFetchFromServerComplete], timeout: SubscriptionTests.networkOperationTimeout)
let upvote = UpvotePostMutation(id: id)
let upvoteComplete = expectation(description: "Upvote mutation completed")
self.appSyncClient?.perform(mutation: upvote,
queue: SubscriptionTests.mutationQueue) { _, _ in
upvoteComplete.fulfill()
}
wait(
for: [
upvoteComplete,
subscriptionResultHandlerInvoked,
],
timeout: SubscriptionTests.networkOperationTimeout
)
}