in PluginsAndFeatures/azure-toolkit-for-intellij/src/main/java/com/microsoft/azure/hdinsight/spark/run/SparkBatchJobDebuggerRunner.java [123:347]
public void execute(final ExecutionEnvironment environment) throws ExecutionException {
final RunProfileState state = environment.getState();
if (state == null) {
return;
}
final Operation operation = environment.getUserData(TelemetryKeys.OPERATION);
final AsyncPromise<ExecutionEnvironment> jobDriverEnvReady = new AsyncPromise<>();
final SparkBatchRemoteDebugState submissionState = (SparkBatchRemoteDebugState) state;
final SparkSubmitModel submitModel = submissionState.getSubmitModel();
// Create SSH debug session firstly
final SparkBatchDebugSession session;
try {
session = SparkBatchDebugSession
.factoryByAuth(getSparkJobUrl(submitModel), submitModel.getAdvancedConfigModel())
.open()
.verifyCertificate();
} catch (final Exception e) {
final ExecutionException exp = new ExecutionException("Failed to create SSH session for debugging. "
+ ExceptionUtils.getRootCauseMessage(e));
EventUtil.logErrorClassNameOnlyWithComplete(operation, ErrorType.systemError, exp, null, null);
throw exp;
}
final Project project = submitModel.getProject();
final ExecutionManager executionManager = ExecutionManager.getInstance(project);
final IdeaSchedulers schedulers = new IdeaSchedulers(project);
final PublishSubject<SparkBatchJobSubmissionEvent> debugEventSubject = PublishSubject.create();
final ISparkBatchDebugJob sparkDebugBatch = (ISparkBatchDebugJob) submissionState.getSparkBatch().clone();
final PublishSubject<SparkLogLine> ctrlSubject =
(PublishSubject<SparkLogLine>) sparkDebugBatch.getCtrlSubject();
final SparkBatchJobRemoteDebugProcess driverDebugProcess = new SparkBatchJobRemoteDebugProcess(
schedulers,
session,
sparkDebugBatch,
submitModel.getArtifactPath().orElseThrow(() -> new ExecutionException("No artifact selected")),
submitModel.getSubmissionParameter().getMainClassName(),
submitModel.getAdvancedConfigModel(),
ctrlSubject);
final SparkBatchJobDebugProcessHandler driverDebugHandler =
new SparkBatchJobDebugProcessHandler(project, driverDebugProcess, debugEventSubject);
// Prepare an independent submission console
final ConsoleViewImpl submissionConsole = new ConsoleViewImpl(project, true);
final RunContentDescriptor submissionDesc = new RunContentDescriptor(
submissionConsole,
driverDebugHandler,
submissionConsole.getComponent(),
String.format("Submit %s to cluster %s",
submitModel.getSubmissionParameter().getMainClassName(),
submitModel.getSubmissionParameter().getClusterName()));
// Show the submission console view
ExecutionManager.getInstance(project).getContentManager().showRunContent(environment.getExecutor(),
submissionDesc);
// Use the submission console to display the deployment ctrl message
final Subscription jobSubscription = ctrlSubject.subscribe(typedMessage -> {
final String line = typedMessage.getRawLog() + "\n";
switch (typedMessage.getMessageInfoType()) {
case Error:
submissionConsole.print(line, ConsoleViewContentType.ERROR_OUTPUT);
break;
case Info:
submissionConsole.print(line, ConsoleViewContentType.NORMAL_OUTPUT);
break;
case Log:
submissionConsole.print(line, ConsoleViewContentType.SYSTEM_OUTPUT);
break;
case Warning:
submissionConsole.print(line, ConsoleViewContentType.LOG_WARNING_OUTPUT);
break;
}
}, err -> {
submissionConsole.print(ExceptionUtils.getRootCauseMessage(err), ConsoleViewContentType.ERROR_OUTPUT);
final String errMsg = "The Spark job remote debug is cancelled due to "
+ ExceptionUtils.getRootCauseMessage(err);
jobDriverEnvReady.setError(errMsg);
EventUtil.logErrorClassNameOnlyWithComplete(operation,
ErrorType.systemError,
new UncheckedExecutionException(errMsg, err),
null,
null);
}, () -> {
if (Optional.ofNullable(driverDebugHandler.getUserData(ProcessHandler.TERMINATION_REQUESTED))
.orElse(false)) {
final String errMsg = "The Spark job remote debug is cancelled by user.";
jobDriverEnvReady.setError(errMsg);
final Map<String, String> props = ImmutableMap.of("isDebugCancelled", "true");
EventUtil.logErrorClassNameOnlyWithComplete(
operation, ErrorType.userError, new ExecutionException(errMsg), props, null);
}
});
// Call after completed or error
debugEventSubject.subscribeOn(Schedulers.io()).doAfterTerminate(session::close).subscribe(debugEvent -> {
try {
if (debugEvent instanceof SparkBatchRemoteDebugHandlerReadyEvent) {
final SparkBatchRemoteDebugHandlerReadyEvent handlerReadyEvent =
(SparkBatchRemoteDebugHandlerReadyEvent) debugEvent;
final SparkBatchDebugJobJdbPortForwardedEvent jdbReadyEvent =
handlerReadyEvent.getJdbPortForwardedEvent();
if (!jdbReadyEvent.getLocalJdbForwardedPort().isPresent()) {
return;
}
final int localPort = jdbReadyEvent.getLocalJdbForwardedPort().get();
final ExecutionEnvironment forkEnv = forkEnvironment(
environment, jdbReadyEvent.getRemoteHost().orElse("unknown"), jdbReadyEvent.isDriver());
final RunProfile runProfile = forkEnv.getRunProfile();
if (!(runProfile instanceof LivySparkBatchJobRunConfiguration)) {
ctrlSubject.onError(new UnsupportedOperationException(
"Only supports LivySparkBatchJobRunConfiguration type, but got type"
+ runProfile.getClass().getCanonicalName()));
return;
}
// Reuse the driver's Spark batch job
((LivySparkBatchJobRunConfiguration) runProfile).setSparkRemoteBatch(sparkDebugBatch);
final SparkBatchRemoteDebugState forkState = jdbReadyEvent.isDriver()
? submissionState
: (SparkBatchRemoteDebugState) forkEnv.getState();
if (forkState == null) {
return;
}
// Set the debug connection to localhost and local forwarded port to the state
forkState.setRemoteConnection(
new RemoteConnection(true, "localhost", Integer.toString(localPort), false));
// Prepare the debug tab console view UI
SparkJobLogConsoleView jobOutputView = new SparkJobLogConsoleView(project);
// Get YARN container log URL port
int containerLogUrlPort =
((SparkBatchRemoteDebugJob) driverDebugProcess.getSparkJob())
.getYarnContainerLogUrlPort()
.toBlocking()
.single();
// Parse container ID and host URL from driver console view
jobOutputView.getSecondaryConsoleView().addMessageFilter((line, entireLength) -> {
Matcher matcher = Pattern.compile(
"Launching container (\\w+).* on host ([a-zA-Z_0-9-.]+)",
Pattern.CASE_INSENSITIVE)
.matcher(line);
while (matcher.find()) {
String containerId = matcher.group(1);
// TODO: get port from somewhere else rather than hard code here
URI hostUri = URI.create(String.format("http://%s:%d",
matcher.group(2),
containerLogUrlPort));
debugEventSubject.onNext(new SparkBatchJobExecutorCreatedEvent(hostUri, containerId));
}
return null;
});
jobOutputView.attachToProcess(handlerReadyEvent.getDebugProcessHandler());
ExecutionResult result = new DefaultExecutionResult(
jobOutputView, handlerReadyEvent.getDebugProcessHandler());
forkState.setExecutionResult(result);
forkState.setConsoleView(jobOutputView.getSecondaryConsoleView());
forkState.setRemoteProcessCtrlLogHandler(handlerReadyEvent.getDebugProcessHandler());
if (jdbReadyEvent.isDriver()) {
// Let the debug console view to handle the control log
jobSubscription.unsubscribe();
// Resolve job driver promise, handle the driver VM attaching separately
jobDriverEnvReady.setResult(forkEnv);
} else {
// Start Executor debugging
executionManager.startRunProfile(forkEnv, () ->
toIdeaPromise(attachAndDebug(forkEnv, forkState)));
}
} else if (debugEvent instanceof SparkBatchJobExecutorCreatedEvent) {
SparkBatchJobExecutorCreatedEvent executorCreatedEvent =
(SparkBatchJobExecutorCreatedEvent) debugEvent;
final String containerId = executorCreatedEvent.getContainerId();
final SparkBatchRemoteDebugJob debugJob =
(SparkBatchRemoteDebugJob) driverDebugProcess.getSparkJob();
URI internalHostUri = executorCreatedEvent.getHostUri();
URI executorLogUrl = debugJob.convertToPublicLogUri(internalHostUri)
.map(uri -> uri.resolve(String.format("node/containerlogs/%s/livy",
containerId)))
.toBlocking().singleOrDefault(internalHostUri);
// Create an Executor Debug Process
SparkBatchJobRemoteDebugExecutorProcess executorDebugProcess =
new SparkBatchJobRemoteDebugExecutorProcess(
schedulers,
debugJob,
internalHostUri.getHost(),
driverDebugProcess.getDebugSession(),
executorLogUrl.toString());
SparkBatchJobDebugProcessHandler executorDebugHandler =
new SparkBatchJobDebugProcessHandler(project, executorDebugProcess, debugEventSubject);
executorDebugHandler.getRemoteDebugProcess().start();
}
} catch (final ExecutionException e) {
EventUtil.logErrorClassNameOnlyWithComplete(
operation, ErrorType.systemError, new UncheckedExecutionException(e), null, null);
throw new UncheckedExecutionException(e);
}
});
driverDebugHandler.getRemoteDebugProcess().start();
// Driver side execute, leverage Intellij Async Promise, to wait for the Spark app deployed
executionManager.startRunProfile(environment, () -> jobDriverEnvReady.thenAsync(driverEnv ->
toIdeaPromise(attachAndDebug(driverEnv, state))));
}