public MessagePublisher()

in src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java [54:128]


  public MessagePublisher(
      @Assisted final Properties properties,
      SessionFactoryProvider sessionFactoryProvider,
      Gson gson) {
    this.session = sessionFactoryProvider.get().create(properties);
    this.properties = properties;
    this.gson = gson;
    this.eventListener =
        new EventListener() {
          private int lostEventCount = 0;

          @Override
          public void onEvent(Event event) {
            if (!publisherThread.isAlive()) {
              ensurePublisherThreadStarted();
            }

            if (queue.offer(event)) {
              if (lostEventCount > 0) {
                logger.atWarning().log(
                    "Event queue is no longer full, %d events were lost", lostEventCount);
                lostEventCount = 0;
              }
            } else {
              if (lostEventCount++ % 10 == 0) {
                logger.atSevere().log("Event queue is full, lost %d event(s)", lostEventCount);
              }
            }
          }
        };
    this.publisher =
        new GracefullyCancelableRunnable() {

          volatile boolean canceled = false;

          @Override
          public void run() {
            while (!canceled) {
              try {
                Event event = queue.take();
                if (event.getType().equals(END_OF_STREAM)) {
                  continue;
                }
                while (!isConnected() && !canceled) {
                  synchronized (sessionMon) {
                    sessionMon.wait(1000);
                  }
                }
                if (!publishEvent(event) && !queue.offer(event)) {
                  logger.atSevere().log("Event lost: %s", gson.toJson(event));
                }
              } catch (InterruptedException e) {
                logger.atWarning().withCause(e).log(
                    "Interupted while waiting for event or connection.");
              }
            }
          }

          @Override
          public void cancel() {
            canceled = true;
            if (queue.isEmpty()) {
              queue.offer(EOS);
            }
          }

          @Override
          public String toString() {
            return "Rabbitmq publisher: "
                + properties.getSection(Gerrit.class).listenAs
                + "-"
                + properties.getSection(AMQP.class).uri;
          }
        };
  }