in src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java [202:243]
private void synchronizePendingEvents(Prune prune) {
if (replaying.compareAndSet(false, true)) {
final Map<ReplicateRefUpdate, String> taskNamesByReplicateRefUpdate = new ConcurrentHashMap<>();
if (Prune.TRUE.equals(prune)) {
for (Destination destination : destinations.get().getAll(FilterType.ALL)) {
taskNamesByReplicateRefUpdate.putAll(destination.getTaskNamesByReplicateRefUpdate());
}
}
new ChainedScheduler.StreamScheduler<>(
workQueue.getDefaultQueue(),
replicationTasksStorage.streamWaiting(),
new ChainedScheduler.Runner<ReplicationTasksStorage.ReplicateRefUpdate>() {
@Override
public void run(ReplicationTasksStorage.ReplicateRefUpdate u) {
try {
fire(new URIish(u.uri()), Project.nameKey(u.project()), u.ref());
if (Prune.TRUE.equals(prune)) {
taskNamesByReplicateRefUpdate.remove(u);
}
} catch (URISyntaxException e) {
repLog.atSevere().withCause(e).log(
"Encountered malformed URI for persisted event %s", u);
} catch (Throwable e) {
repLog.atSevere().withCause(e).log("Unexpected error while firing pending events");
}
}
@Override
public void onDone() {
if (Prune.TRUE.equals(prune)) {
pruneNoLongerPending(new HashSet<>(taskNamesByReplicateRefUpdate.values()));
}
replaying.set(false);
}
@Override
public String toString(ReplicationTasksStorage.ReplicateRefUpdate u) {
return "Scheduling push to " + String.format("%s:%s", u.project(), u.ref());
}
});
}
}