private void synchronizePendingEvents()

in src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java [202:243]


  private void synchronizePendingEvents(Prune prune) {
    if (replaying.compareAndSet(false, true)) {
      final Map<ReplicateRefUpdate, String> taskNamesByReplicateRefUpdate = new ConcurrentHashMap<>();
      if (Prune.TRUE.equals(prune)) {
        for (Destination destination : destinations.get().getAll(FilterType.ALL)) {
          taskNamesByReplicateRefUpdate.putAll(destination.getTaskNamesByReplicateRefUpdate());
        }
      }
      new ChainedScheduler.StreamScheduler<>(
          workQueue.getDefaultQueue(),
          replicationTasksStorage.streamWaiting(),
          new ChainedScheduler.Runner<ReplicationTasksStorage.ReplicateRefUpdate>() {
            @Override
            public void run(ReplicationTasksStorage.ReplicateRefUpdate u) {
              try {
                fire(new URIish(u.uri()), Project.nameKey(u.project()), u.ref());
                if (Prune.TRUE.equals(prune)) {
                  taskNamesByReplicateRefUpdate.remove(u);
                }
              } catch (URISyntaxException e) {
                repLog.atSevere().withCause(e).log(
                    "Encountered malformed URI for persisted event %s", u);
              } catch (Throwable e) {
                repLog.atSevere().withCause(e).log("Unexpected error while firing pending events");
              }
            }

            @Override
            public void onDone() {
              if (Prune.TRUE.equals(prune)) {
                pruneNoLongerPending(new HashSet<>(taskNamesByReplicateRefUpdate.values()));
              }
              replaying.set(false);
            }

            @Override
            public String toString(ReplicationTasksStorage.ReplicateRefUpdate u) {
              return "Scheduling push to " + String.format("%s:%s", u.project(), u.ref());
            }
          });
    }
  }