public synchronized void subscribe()

in aws-android-sdk-appsync/src/main/java/com/amazonaws/mobileconnectors/appsync/subscription/RealSubscriptionManager.java [180:311]


    public synchronized  <T> void subscribe(
            @Nonnull Subscription<?, T, ?> subscription,
            @Nonnull final List<String> newTopics,
            @Nonnull SubscriptionResponse response,
            ResponseNormalizer<Map<String, Object>> mapResponseNormalizer) {
        Log.v(TAG, "Subscription Infrastructure: subscribe called for " + subscription);

        //Look up from or register subscription in the subscriptionsById map.
        SubscriptionObject subscriptionObject = getSubscriptionObjectFromIdMap(subscription);
        if ( subscriptionObject == null ) {
            subscriptionObject = createAndAddSubscriptionObjectToIdMap(subscription);
        }
        subscriptionObject.subscription = subscription;
        subscriptionObject.normalizer = mapResponseNormalizer;
        subscriptionObject.scalarTypeAdapters = this.scalarTypeAdapters;

        //Add the new topics to this Subscription Object
        //and add the subscriptions to the topic map.
        for (String topic : newTopics) {
            subscriptionObject.topics.add(topic);
            addSubscriptionObjectToTopic(topic, subscriptionObject);
        }

        final CountDownLatch allClientsConnectedLatch = new CountDownLatch(response.mqttInfos.size());

        // Create new clients, connections, and subscriptions
        final List<SubscriptionClient> newClients = new ArrayList<>();
        Log.v(TAG, "Subscription Infrastructure: Attempting to make [" + response.mqttInfos.size() + "] MQTT clients]");
        final Set<String> topicSet = subscriptionsByTopic.keySet();
        //Clear the topic Connection map
        topicConnectionMap.clear();

        //Add delay to allow for the server side propagation of the Connection URLs
        try {
            Thread.sleep(1 * 1000);
        }catch (Exception e) {
            Log.v(TAG, "Subscription Infrastructure: Thread.sleep for server propagation delay was interrupted");
        }

        for (final SubscriptionResponse.MqttInfo info : response.mqttInfos) {

            //Check if this MQTT connection meta data has at least one topic that we have a subscription for
            boolean noSubscriptionsFoundForTopicMetadata = true;
            for (String topic: info.topics) {
                if (topicSet.contains(topic)) {
                    noSubscriptionsFoundForTopicMetadata = false;
                }
            }

            //If this connection doesn't contain any topics we are interested in, don't connect.
            if (noSubscriptionsFoundForTopicMetadata) {
                allClientsConnectedLatch.countDown();
                continue;
            }

            final MqttSubscriptionClient mqttClient = new MqttSubscriptionClient(this.applicationContext, info.wssURL, info.clientId);
            // Silence new clients until swapped with old clients
            mqttClient.setTransmitting(false);
            Log.v(TAG, "Subscription Infrastructure: Connecting with Client ID[" + info.clientId +"]");
            mqttClient.connect(new SubscriptionClientCallback() {
                @Override
                public void onConnect() {
                    if (subscriptionsAutoReconnect) {
                        reportSuccessfulConnection();
                    }
                    Log.v(TAG, String.format("Subscription Infrastructure: Connection successful for clientID [" + info.clientId + "]. Will subscribe up to %d topics", info.topics.length));
                    for (String topic : info.topics) {
                        if (topicSet.contains(topic)) {
                            Log.v(TAG, String.format("Subscription Infrastructure: Subscribing to MQTT topic:[%s]", topic));
                            mqttClient.subscribe(topic, 1, mainMessageCallback);
                            topicConnectionMap.put(topic, mqttClient);
                        }
                    }
                    newClients.add(mqttClient);
                    allClientsConnectedLatch.countDown();
                }

                @Override
                public void onError(Exception e) {
                    Log.v(TAG, "Subscription Infrastructure: onError called "  + e);
                    if (subscriptionsAutoReconnect) {
                        if ( e instanceof SubscriptionDisconnectedException ) {
                            Log.v(TAG, "Subscription Infrastructure: Disconnect received. Unexpected - Initiating reconnect sequence.");
                            reportConnectionError();
                            initiateReconnectSequence();
                            return;
                        }
                    }
                    //Propagate connection error
                    for (String topic: info.topics) {
                        if ( getSubscriptionObjectSetFromTopicMap(topic) != null ) {
                            for (SubscriptionObject subscriptionObject : getSubscriptionObjectSetFromTopicMap(topic)) {
                                subscriptionObject.onFailure(new ApolloException("Connection Error Reported", e));
                            }
                        }
                    }
                    allClientsConnectedLatch.countDown();
                }
            });
        }


        try {
            allClientsConnectedLatch.await();
            Log.v(TAG, "Subscription Infrastructure: Made [" + newClients.size() + "] MQTT clients");

            Log.v(TAG, "Subscription Infrastructure: Unmuting the new clients [" + newClients.size() + "] in total");
            // Unmute new clients.
            for (final SubscriptionClient client : newClients) {
                client.setTransmitting(true);
            }

            // Silence the old clients
            Log.v(TAG, "Subscription Infrastructure: Muting the old clients [ " + clients.size() + "] in total");
            for (final SubscriptionClient client : clients) {
                client.setTransmitting(false);
            }

            // Close old clients
            Log.v(TAG, "Subscription Infrastructure: Closing the old clients [" + clients.size() + "] in total");
            for (final SubscriptionClient client : clients) {
                Log.v(TAG, "Subscription Infrastructure: Closing client: " + client);
                client.close();
            }

            //Add the new clients
            clients.clear();
            clients.addAll(newClients);
        } catch (InterruptedException e) {
            throw new RuntimeException("Subscription Infrastructure: Failed to wait for all clients to finish connecting.", e);
        }
    }