in src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java [182:217]
private void fire(
Project.NameKey project,
ObjectId objectId,
String refName,
boolean isDelete,
ReplicationState state) {
if (!running) {
stateLog.warn(
"Replication plugin did not finish startup before event, event replication is postponed",
state);
beforeStartupEventsQueue.add(
ReferenceUpdatedEvent.create(project.get(), refName, objectId, isDelete));
return;
}
ForkJoinPool fetchCallsPool = null;
try {
fetchCallsPool = new ForkJoinPool(sources.get().getAll().size());
final Consumer<Source> callFunction =
callFunction(project, objectId, refName, isDelete, state);
fetchCallsPool
.submit(() -> sources.get().getAll().parallelStream().forEach(callFunction))
.get(fetchCallsTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
stateLog.error(
String.format(
"Exception during the pull replication fetch rest api call. Message:%s",
e.getMessage()),
e,
state);
} finally {
if (fetchCallsPool != null) {
fetchCallsPool.shutdown();
}
}
}