public void start()

in src/main/java/com/microsoft/azure/spark/tools/processes/SparkBatchJobRemoteProcess.java [142:178]


    public void start() {
        // Build, deploy and wait for the job done.
        jobSubscription.set(prepareArtifact()
                .flatMap(this::submitJob)
                .delay(SparkBatchJob::awaitStarted)
                .flatMap(this::attachInputStreams)
                .flatMap(this::awaitForJobDone)
                .subscribe(
                        sdPair -> {
                            if (sparkJob.isSuccess(sdPair.getKey())) {
                                ctrlInfo("");
                                ctrlInfo("========== RESULT ==========");
                                ctrlInfo("Job run successfully.");
                            } else {
                                ctrlInfo("");
                                ctrlInfo("========== RESULT ==========");
                                ctrlError("Job state is " + sdPair.getKey());
                                ctrlError("Diagnostics: " + sdPair.getValue());
                            }
                        },
                        err -> {
                            if (err instanceof SparkJobFinishedException
                                    || err.getCause() instanceof SparkJobFinishedException) {
                                // If we call destroy() when job is dead,
                                // we will get exception with `job is finished` error message
                                ctrlError("Job is already finished.");
                                isDestroyed = true;
                                disconnect(EXIT_ERROR_UNEXPECTED_STOP);
                            } else {
                                ctrlError(err.getMessage());
                                destroy();
                            }
                        },
                        () -> {
                            disconnect(EXIT_OK);
                        }));
    }