in src/main/java/com/microsoft/azure/spark/tools/job/LivySparkBatch.java [352:371]
public Observable<String> awaitStarted() {
return get()
.retry(getRetriesMax())
.repeatWhen(ob -> ob
.doOnNext(ignored -> {
getCtrlSubject().onNext(new Pair<>(Info, "The Spark job is starting..."));
})
.delay(getDelaySeconds(), TimeUnit.SECONDS)
)
.takeUntil(batch -> isDone(batch.state) || isRunning(batch.state))
.filter(batch -> isDone(batch.state) || isRunning(batch.state))
.flatMap(batch -> {
if (isDone(batch.state) && !isSuccess(batch.state)) {
return Observable.error(new SparkJobException("The Spark job failed to start due to "
+ String.join("\n", batch.submissionLogs)));
}
return Observable.just(batch.state);
});
}