in src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java [54:128]
public MessagePublisher(
@Assisted final Properties properties,
SessionFactoryProvider sessionFactoryProvider,
Gson gson) {
this.session = sessionFactoryProvider.get().create(properties);
this.properties = properties;
this.gson = gson;
this.eventListener =
new EventListener() {
private int lostEventCount = 0;
@Override
public void onEvent(Event event) {
if (!publisherThread.isAlive()) {
ensurePublisherThreadStarted();
}
if (queue.offer(event)) {
if (lostEventCount > 0) {
logger.atWarning().log(
"Event queue is no longer full, %d events were lost", lostEventCount);
lostEventCount = 0;
}
} else {
if (lostEventCount++ % 10 == 0) {
logger.atSevere().log("Event queue is full, lost %d event(s)", lostEventCount);
}
}
}
};
this.publisher =
new GracefullyCancelableRunnable() {
volatile boolean canceled = false;
@Override
public void run() {
while (!canceled) {
try {
Event event = queue.take();
if (event.getType().equals(END_OF_STREAM)) {
continue;
}
while (!isConnected() && !canceled) {
synchronized (sessionMon) {
sessionMon.wait(1000);
}
}
if (!publishEvent(event) && !queue.offer(event)) {
logger.atSevere().log("Event lost: %s", gson.toJson(event));
}
} catch (InterruptedException e) {
logger.atWarning().withCause(e).log(
"Interupted while waiting for event or connection.");
}
}
}
@Override
public void cancel() {
canceled = true;
if (queue.isEmpty()) {
queue.offer(EOS);
}
}
@Override
public String toString() {
return "Rabbitmq publisher: "
+ properties.getSection(Gerrit.class).listenAs
+ "-"
+ properties.getSection(AMQP.class).uri;
}
};
}