in aws-android-sdk-appsync/src/main/java/com/amazonaws/mobileconnectors/appsync/subscription/RealSubscriptionManager.java [180:311]
public synchronized <T> void subscribe(
@Nonnull Subscription<?, T, ?> subscription,
@Nonnull final List<String> newTopics,
@Nonnull SubscriptionResponse response,
ResponseNormalizer<Map<String, Object>> mapResponseNormalizer) {
Log.v(TAG, "Subscription Infrastructure: subscribe called for " + subscription);
//Look up from or register subscription in the subscriptionsById map.
SubscriptionObject subscriptionObject = getSubscriptionObjectFromIdMap(subscription);
if ( subscriptionObject == null ) {
subscriptionObject = createAndAddSubscriptionObjectToIdMap(subscription);
}
subscriptionObject.subscription = subscription;
subscriptionObject.normalizer = mapResponseNormalizer;
subscriptionObject.scalarTypeAdapters = this.scalarTypeAdapters;
//Add the new topics to this Subscription Object
//and add the subscriptions to the topic map.
for (String topic : newTopics) {
subscriptionObject.topics.add(topic);
addSubscriptionObjectToTopic(topic, subscriptionObject);
}
final CountDownLatch allClientsConnectedLatch = new CountDownLatch(response.mqttInfos.size());
// Create new clients, connections, and subscriptions
final List<SubscriptionClient> newClients = new ArrayList<>();
Log.v(TAG, "Subscription Infrastructure: Attempting to make [" + response.mqttInfos.size() + "] MQTT clients]");
final Set<String> topicSet = subscriptionsByTopic.keySet();
//Clear the topic Connection map
topicConnectionMap.clear();
//Add delay to allow for the server side propagation of the Connection URLs
try {
Thread.sleep(1 * 1000);
}catch (Exception e) {
Log.v(TAG, "Subscription Infrastructure: Thread.sleep for server propagation delay was interrupted");
}
for (final SubscriptionResponse.MqttInfo info : response.mqttInfos) {
//Check if this MQTT connection meta data has at least one topic that we have a subscription for
boolean noSubscriptionsFoundForTopicMetadata = true;
for (String topic: info.topics) {
if (topicSet.contains(topic)) {
noSubscriptionsFoundForTopicMetadata = false;
}
}
//If this connection doesn't contain any topics we are interested in, don't connect.
if (noSubscriptionsFoundForTopicMetadata) {
allClientsConnectedLatch.countDown();
continue;
}
final MqttSubscriptionClient mqttClient = new MqttSubscriptionClient(this.applicationContext, info.wssURL, info.clientId);
// Silence new clients until swapped with old clients
mqttClient.setTransmitting(false);
Log.v(TAG, "Subscription Infrastructure: Connecting with Client ID[" + info.clientId +"]");
mqttClient.connect(new SubscriptionClientCallback() {
@Override
public void onConnect() {
if (subscriptionsAutoReconnect) {
reportSuccessfulConnection();
}
Log.v(TAG, String.format("Subscription Infrastructure: Connection successful for clientID [" + info.clientId + "]. Will subscribe up to %d topics", info.topics.length));
for (String topic : info.topics) {
if (topicSet.contains(topic)) {
Log.v(TAG, String.format("Subscription Infrastructure: Subscribing to MQTT topic:[%s]", topic));
mqttClient.subscribe(topic, 1, mainMessageCallback);
topicConnectionMap.put(topic, mqttClient);
}
}
newClients.add(mqttClient);
allClientsConnectedLatch.countDown();
}
@Override
public void onError(Exception e) {
Log.v(TAG, "Subscription Infrastructure: onError called " + e);
if (subscriptionsAutoReconnect) {
if ( e instanceof SubscriptionDisconnectedException ) {
Log.v(TAG, "Subscription Infrastructure: Disconnect received. Unexpected - Initiating reconnect sequence.");
reportConnectionError();
initiateReconnectSequence();
return;
}
}
//Propagate connection error
for (String topic: info.topics) {
if ( getSubscriptionObjectSetFromTopicMap(topic) != null ) {
for (SubscriptionObject subscriptionObject : getSubscriptionObjectSetFromTopicMap(topic)) {
subscriptionObject.onFailure(new ApolloException("Connection Error Reported", e));
}
}
}
allClientsConnectedLatch.countDown();
}
});
}
try {
allClientsConnectedLatch.await();
Log.v(TAG, "Subscription Infrastructure: Made [" + newClients.size() + "] MQTT clients");
Log.v(TAG, "Subscription Infrastructure: Unmuting the new clients [" + newClients.size() + "] in total");
// Unmute new clients.
for (final SubscriptionClient client : newClients) {
client.setTransmitting(true);
}
// Silence the old clients
Log.v(TAG, "Subscription Infrastructure: Muting the old clients [ " + clients.size() + "] in total");
for (final SubscriptionClient client : clients) {
client.setTransmitting(false);
}
// Close old clients
Log.v(TAG, "Subscription Infrastructure: Closing the old clients [" + clients.size() + "] in total");
for (final SubscriptionClient client : clients) {
Log.v(TAG, "Subscription Infrastructure: Closing client: " + client);
client.close();
}
//Add the new clients
clients.clear();
clients.addAll(newClients);
} catch (InterruptedException e) {
throw new RuntimeException("Subscription Infrastructure: Failed to wait for all clients to finish connecting.", e);
}
}