synchronized void requestSubscription()

in aws-api/src/main/java/com/amplifyframework/api/aws/SubscriptionEndpoint.java [108:187]


    synchronized <T> void requestSubscription(
            @NonNull GraphQLRequest<T> request,
            @NonNull AuthorizationType authType,
            @NonNull Consumer<String> onSubscriptionStarted,
            @NonNull Consumer<GraphQLResponse<T>> onNextItem,
            @NonNull Consumer<ApiException> onSubscriptionError,
            @NonNull Action onSubscriptionComplete) {
        Objects.requireNonNull(request);
        Objects.requireNonNull(onSubscriptionStarted);
        Objects.requireNonNull(onNextItem);
        Objects.requireNonNull(onSubscriptionError);
        Objects.requireNonNull(onSubscriptionComplete);

        // The first call to subscribe OR a disconnected websocket listener will
        // force a new connection to be created.
        if (webSocketListener == null || webSocketListener.isDisconnectedState()) {
            webSocketListener = new AmplifyWebSocketListener();
            try {
                webSocket = okHttpClient.newWebSocket(new Request.Builder()
                    .url(buildConnectionRequestUrl(authType))
                    .addHeader("Sec-WebSocket-Protocol", "graphql-ws")
                    .build(), webSocketListener);
            } catch (ApiException apiException) {
                onSubscriptionError.accept(apiException);
                return;
            }

        }
        final String subscriptionId = UUID.randomUUID().toString();
        pendingSubscriptionIds.add(subscriptionId);
        // Every request waits here for the connection to be ready.
        Connection connection = webSocketListener.waitForConnectionReady();
        if (connection.hasFailure()) {
            // If the latch didn't count all the way down
            if (pendingSubscriptionIds.remove(subscriptionId)) {
                // The subscription was pending, so we need to emit an error.
                onSubscriptionError.accept(
                    new ApiException(connection.getFailureReason(), AmplifyException.TODO_RECOVERY_SUGGESTION));
                return;
            }
        }

        try {
            webSocket.send(new JSONObject()
                .put("id", subscriptionId)
                .put("type", "start")
                .put("payload", new JSONObject()
                .put("data", request.getContent())
                .put("extensions", new JSONObject()
                .put("authorization", authorizer.createHeadersForSubscription(request, authType))))
                .toString()
            );
        } catch (JSONException | ApiException exception) {
            // If the subscriptionId was still pending, then we can call the onSubscriptionError
            if (pendingSubscriptionIds.remove(subscriptionId)) {
                if (exception instanceof ApiAuthException) {
                    // Don't wrap it if it's an ApiAuthException.
                    onSubscriptionError.accept((ApiAuthException) exception);
                } else {
                    onSubscriptionError.accept(new ApiException(
                        "Failed to construct subscription registration message.",
                        exception,
                        AmplifyException.TODO_RECOVERY_SUGGESTION
                    ));
                }

            }
            return;
        }

        Subscription<T> subscription = new Subscription<>(
            onNextItem, onSubscriptionError, onSubscriptionComplete,
            responseFactory, request.getResponseType(), request
        );
        subscriptions.put(subscriptionId, subscription);
        if (subscription.awaitSubscriptionReady()) {
            pendingSubscriptionIds.remove(subscriptionId);
            onSubscriptionStarted.accept(subscriptionId);
        }
    }