in bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/AbstractJob.java [102:138]
public void run() {
boolean success = true;
try {
// Persist job state and required data.
beforeRun();
// Send job cache to agents
List<String> hostnames = stages.stream()
.map(Stage::getStageContext)
.map(StageContext::getHostnames)
.flatMap(List::stream)
.distinct()
.toList();
JobCacheHelper.sendJobCache(jobPO.getId(), hostnames);
LinkedBlockingQueue<Stage> queue = new LinkedBlockingQueue<>(stages);
while (!queue.isEmpty()) {
Stage stage = queue.poll();
Boolean stageSuccess = stage.run();
if (!stageSuccess) {
success = false;
break;
}
}
} catch (Exception e) {
log.error(e.getMessage(), e);
success = false;
}
if (success) {
onSuccess();
} else {
onFailure();
}
}