in src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java [74:112]
public ListenableFuture<Boolean> send(String topic, Event message) {
SettableFuture<Boolean> resultFuture = SettableFuture.create();
if (!nodeInstanceId.equals(message.instanceId)) {
resultFuture.set(true);
return resultFuture;
}
if (Strings.isNullOrEmpty(message.instanceId)) {
log.warn(
"Dropping event '{}' because event instance id cannot be null or empty",
message.toString());
resultFuture.set(true);
return resultFuture;
}
ListenableFuture<Boolean> resfultF = apiDelegate.get().send(topic, message);
Futures.addCallback(
resfultF,
new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
msgLog.log(Direction.PUBLISH, topic, message);
metrics.incrementBrokerPublishedMessage();
}
@Override
public void onFailure(Throwable throwable) {
log.error(
"Failed to publish message '{}' to topic '{}' - error: {}",
message.toString(),
topic,
throwable.getMessage());
metrics.incrementBrokerFailedToPublishMessage();
}
},
executor);
return resfultF;
}