in src/main/java/com/microsoft/azure/spark/tools/processes/SparkBatchJobRemoteProcess.java [142:178]
public void start() {
// Build, deploy and wait for the job done.
jobSubscription.set(prepareArtifact()
.flatMap(this::submitJob)
.delay(SparkBatchJob::awaitStarted)
.flatMap(this::attachInputStreams)
.flatMap(this::awaitForJobDone)
.subscribe(
sdPair -> {
if (sparkJob.isSuccess(sdPair.getKey())) {
ctrlInfo("");
ctrlInfo("========== RESULT ==========");
ctrlInfo("Job run successfully.");
} else {
ctrlInfo("");
ctrlInfo("========== RESULT ==========");
ctrlError("Job state is " + sdPair.getKey());
ctrlError("Diagnostics: " + sdPair.getValue());
}
},
err -> {
if (err instanceof SparkJobFinishedException
|| err.getCause() instanceof SparkJobFinishedException) {
// If we call destroy() when job is dead,
// we will get exception with `job is finished` error message
ctrlError("Job is already finished.");
isDestroyed = true;
disconnect(EXIT_ERROR_UNEXPECTED_STOP);
} else {
ctrlError(err.getMessage());
destroy();
}
},
() -> {
disconnect(EXIT_OK);
}));
}