in src/main/java/build/buildfarm/worker/Pipeline.java [115:188]
private void join(boolean closeStage) throws InterruptedException {
List<PipelineStage> inactiveStages = new ArrayList<>();
InterruptedException intEx = null;
try {
while (!stageThreads.isEmpty()) {
if (closeStage) {
PipelineStage stageToClose = null;
int maxPriority = -1;
for (PipelineStage stage : stageThreads.keySet()) {
if (stage.isClosed()) {
stageToClose = null;
break;
}
if (stageClosePriorities.get(stage) > maxPriority) {
maxPriority = stageClosePriorities.get(stage);
stageToClose = stage;
}
}
if (stageToClose != null && !stageToClose.isClosed()) {
logger.log(Level.FINE, "Closing stage at priority " + maxPriority);
stageToClose.close();
}
}
for (Map.Entry<PipelineStage, Thread> stageThread : stageThreads.entrySet()) {
PipelineStage stage = stageThread.getKey();
Thread thread = stageThread.getValue();
try {
thread.join(closeStage ? 1 : 1000);
} catch (InterruptedException e) {
if (!closeStage) {
synchronized (this) {
while (closing && !stageThreads.isEmpty()) {
wait();
}
}
throw e;
}
intEx = e;
}
if (!thread.isAlive()) {
logger.log(
Level.FINE,
"Stage "
+ stage.name()
+ " has exited at priority "
+ stageClosePriorities.get(stage));
inactiveStages.add(stage);
} else if (stage.isClosed()) {
logger.log(
Level.INFO,
"Interrupting unterminated closed thread in stage "
+ stage.name()
+ " at priority "
+ stageClosePriorities.get(stage));
thread.interrupt();
}
}
closeStage = false;
for (PipelineStage stage : inactiveStages) {
synchronized (this) {
stageThreads.remove(stage);
closeStage = true;
notify();
}
}
inactiveStages.clear();
}
} finally {
if (intEx != null) {
throw intEx;
}
}
}