in src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java [215:240]
void drainReplicationEvents(Destination destination) throws EventQueueNotEmptyException {
int drainQueueAttempts = destination.getDrainQueueAttempts();
if (drainQueueAttempts == 0) {
return;
}
int pending = destination.getQueueInfo().pending.size();
int inFlight = destination.getQueueInfo().inFlight.size();
while ((inFlight > 0 || pending > 0) && drainQueueAttempts > 0) {
try {
logger.atInfo().log(
"Draining replication events, postpone shutdown. Events left: inFlight %d, pending %d",
inFlight, pending);
Thread.sleep(destination.getReplicationDelayMilliseconds());
} catch (InterruptedException ie) {
logger.atWarning().withCause(ie).log(
"Wait for replication events to drain has been interrupted");
}
pending = destination.getQueueInfo().pending.size();
inFlight = destination.getQueueInfo().inFlight.size();
drainQueueAttempts--;
}
if (pending > 0 || inFlight > 0) {
throw new EventQueueNotEmptyException(
String.format("Pending: %d - InFlight: %d", pending, inFlight));
}
}