in core/src/main/java/flex/messaging/MessageClient.java [685:769]
public void invalidate(boolean notifyClient) {
synchronized (lock) {
if (!valid || invalidating)
return; // Already shutting down.
invalidating = true; // This thread gets to shut the MessageClient down.
cancelTimeout();
}
// Record whether we're attempting to notify the client or not.
attemptingInvalidationClientNotification = notifyClient;
// Build a subscription invalidation message and push to the client if it is still valid.
if (notifyClient && flexClient != null && flexClient.isValid()) {
CommandMessage msg = new CommandMessage();
msg.setDestination(destination.getId());
msg.setClientId(clientId);
msg.setOperation(CommandMessage.SUBSCRIPTION_INVALIDATE_OPERATION);
Set subscriberIds = new TreeSet();
subscriberIds.add(clientId);
try {
if (destination instanceof MessageDestination) {
MessageDestination msgDestination = (MessageDestination) destination;
((MessageService) msgDestination.getService()).pushMessageToClients(msgDestination, subscriberIds, msg, false /* don't eval selector */);
}
} catch (MessageException ignore) {
}
}
// Notify messageClientDestroyed listeners that we're being invalidated.
if (destroyedListeners != null && !destroyedListeners.isEmpty()) {
for (Iterator iter = destroyedListeners.iterator(); iter.hasNext(); ) {
((MessageClientListener) iter.next()).messageClientDestroyed(this);
}
destroyedListeners.clear();
}
// And generate unsubscribe messages for all of the MessageClient's subscriptions and
// route them to the destination this MessageClient is subscribed to.
// The reason we send a message to the service rather than just going straight to the SubscriptionManager
// is that some adapters manage their own subscription state (i.e. JMS) in addition to us keeping track of
// things with our SubscriptionManager.
ArrayList<CommandMessage> unsubMessages = new ArrayList<CommandMessage>();
synchronized (lock) {
for (SubscriptionInfo subInfo : subscriptions) {
CommandMessage unsubMessage = new CommandMessage();
unsubMessage.setDestination(destination.getId());
unsubMessage.setClientId(clientId);
unsubMessage.setOperation(CommandMessage.UNSUBSCRIBE_OPERATION);
unsubMessage.setHeader(CommandMessage.SUBSCRIPTION_INVALIDATED_HEADER, Boolean.TRUE);
unsubMessage.setHeader(CommandMessage.SELECTOR_HEADER, subInfo.selector);
unsubMessage.setHeader(AsyncMessage.SUBTOPIC_HEADER_NAME, subInfo.subtopic);
unsubMessages.add(unsubMessage);
}
}
// Release the lock and send the unsub messages.
for (CommandMessage unsubMessage : unsubMessages) {
try {
destination.getService().serviceCommand(unsubMessage);
} catch (MessageException me) {
if (Log.isDebug())
Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug("MessageClient: " + getClientId() + " issued an unsubscribe message during invalidation that was not processed but will continue with invalidation. Reason: " + ExceptionUtil.toString(me));
}
}
synchronized (lock) {
// If we didn't clean up all subscriptions log an error and continue with shutdown.
int remainingSubscriptionCount = subscriptions.size();
if (remainingSubscriptionCount > 0 && Log.isError())
Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).error("MessageClient: " + getClientId() + " failed to remove " + remainingSubscriptionCount + " subscription(s) during invalidation");
}
// If someone registered this message client, invalidating it will free
// their reference which will typically also remove this message client.
if (registered && destination instanceof MessageDestination)
((MessageDestination) destination).getSubscriptionManager().releaseMessageClient(this);
synchronized (lock) {
valid = false;
invalidating = false;
}
if (Log.isDebug())
Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug("MessageClient with clientId '" + clientId + "' for destination '" + destinationId + "' has been invalidated.");
}