in aws-api/src/main/java/com/amplifyframework/api/aws/SubscriptionEndpoint.java [108:187]
synchronized <T> void requestSubscription(
@NonNull GraphQLRequest<T> request,
@NonNull AuthorizationType authType,
@NonNull Consumer<String> onSubscriptionStarted,
@NonNull Consumer<GraphQLResponse<T>> onNextItem,
@NonNull Consumer<ApiException> onSubscriptionError,
@NonNull Action onSubscriptionComplete) {
Objects.requireNonNull(request);
Objects.requireNonNull(onSubscriptionStarted);
Objects.requireNonNull(onNextItem);
Objects.requireNonNull(onSubscriptionError);
Objects.requireNonNull(onSubscriptionComplete);
// The first call to subscribe OR a disconnected websocket listener will
// force a new connection to be created.
if (webSocketListener == null || webSocketListener.isDisconnectedState()) {
webSocketListener = new AmplifyWebSocketListener();
try {
webSocket = okHttpClient.newWebSocket(new Request.Builder()
.url(buildConnectionRequestUrl(authType))
.addHeader("Sec-WebSocket-Protocol", "graphql-ws")
.build(), webSocketListener);
} catch (ApiException apiException) {
onSubscriptionError.accept(apiException);
return;
}
}
final String subscriptionId = UUID.randomUUID().toString();
pendingSubscriptionIds.add(subscriptionId);
// Every request waits here for the connection to be ready.
Connection connection = webSocketListener.waitForConnectionReady();
if (connection.hasFailure()) {
// If the latch didn't count all the way down
if (pendingSubscriptionIds.remove(subscriptionId)) {
// The subscription was pending, so we need to emit an error.
onSubscriptionError.accept(
new ApiException(connection.getFailureReason(), AmplifyException.TODO_RECOVERY_SUGGESTION));
return;
}
}
try {
webSocket.send(new JSONObject()
.put("id", subscriptionId)
.put("type", "start")
.put("payload", new JSONObject()
.put("data", request.getContent())
.put("extensions", new JSONObject()
.put("authorization", authorizer.createHeadersForSubscription(request, authType))))
.toString()
);
} catch (JSONException | ApiException exception) {
// If the subscriptionId was still pending, then we can call the onSubscriptionError
if (pendingSubscriptionIds.remove(subscriptionId)) {
if (exception instanceof ApiAuthException) {
// Don't wrap it if it's an ApiAuthException.
onSubscriptionError.accept((ApiAuthException) exception);
} else {
onSubscriptionError.accept(new ApiException(
"Failed to construct subscription registration message.",
exception,
AmplifyException.TODO_RECOVERY_SUGGESTION
));
}
}
return;
}
Subscription<T> subscription = new Subscription<>(
onNextItem, onSubscriptionError, onSubscriptionComplete,
responseFactory, request.getResponseType(), request
);
subscriptions.put(subscriptionId, subscription);
if (subscription.awaitSubscriptionReady()) {
pendingSubscriptionIds.remove(subscriptionId);
onSubscriptionStarted.accept(subscriptionId);
}
}