in src/main/java/com/microsoft/azure/spark/tools/processes/SparkBatchJobRemoteProcess.java [117:136]
public synchronized void destroy() {
if (!isDestroyed()) {
getSparkJob().killBatchJob()
.doOnEach(notification -> {
this.isDestroyed = true;
if (notification.isOnError()) {
getCtrlSubject().onError(notification.getThrowable());
this.disconnect(EXIT_ERROR_CANNOT_BE_KILLED);
} else if (notification.isOnNext()) {
getEventSubject().onNext(new SparkBatchJobKilledEvent());
this.disconnect(EXIT_ERROR_KILLED_BY_USER);
}
})
.subscribe(
job -> log().info("Killed Spark batch job " + job.getBatchId()),
err -> log().warn("Got error when killing Spark batch job", err),
() -> { });
}
}