in java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java [633:711]
protected ListenableFuture<TopicRouteData> getRouteData(final String topic) {
SettableFuture<TopicRouteData> future0 = SettableFuture.create();
TopicRouteData topicRouteData = topicRouteCache.get(topic);
// If route result was cached before, get it directly.
if (null != topicRouteData) {
future0.set(topicRouteData);
return future0;
}
inflightRouteFutureLock.lock();
try {
// If route was fetched by last in-flight request, get it directly.
topicRouteData = topicRouteCache.get(topic);
if (null != topicRouteData) {
future0.set(topicRouteData);
return future0;
}
Set<SettableFuture<TopicRouteData>> inflightFutures = inflightRouteFutureTable.get(topic);
// Request is in-flight, return future directly.
if (null != inflightFutures) {
inflightFutures.add(future0);
return future0;
}
inflightFutures = new HashSet<>();
inflightFutures.add(future0);
inflightRouteFutureTable.put(topic, inflightFutures);
} finally {
inflightRouteFutureLock.unlock();
}
final ListenableFuture<TopicRouteData> future = fetchTopicRoute(topic);
Futures.addCallback(future, new FutureCallback<TopicRouteData>() {
@Override
public void onSuccess(TopicRouteData topicRouteData) {
inflightRouteFutureLock.lock();
try {
final Set<SettableFuture<TopicRouteData>> newFutureSet =
inflightRouteFutureTable.remove(topic);
if (null == newFutureSet) {
// Should never reach here.
log.error("[Bug] in-flight route futures was empty, topic={}, clientId={}", topic,
clientId);
return;
}
log.debug("Fetch topic route successfully, topic={}, in-flight route future "
+ "size={}, clientId={}", topic, newFutureSet.size(), clientId);
for (SettableFuture<TopicRouteData> newFuture : newFutureSet) {
newFuture.set(topicRouteData);
}
} catch (Throwable t) {
// Should never reach here.
log.error("[Bug] Exception raised while update route data, topic={}, clientId={}", topic,
clientId, t);
} finally {
inflightRouteFutureLock.unlock();
}
}
@Override
public void onFailure(Throwable t) {
inflightRouteFutureLock.lock();
try {
final Set<SettableFuture<TopicRouteData>> newFutureSet =
inflightRouteFutureTable.remove(topic);
if (null == newFutureSet) {
// Should never reach here.
log.error("[Bug] in-flight route futures was empty, topic={}, clientId={}", topic, clientId);
return;
}
log.debug("Failed to fetch topic route, topic={}, in-flight route future " +
"size={}, clientId={}", topic, newFutureSet.size(), clientId, t);
for (SettableFuture<TopicRouteData> future : newFutureSet) {
future.setException(t);
}
} finally {
inflightRouteFutureLock.unlock();
}
}
}, MoreExecutors.directExecutor());
return future0;
}