public void execute()

in PluginsAndFeatures/azure-toolkit-for-intellij/src/main/java/com/microsoft/azure/hdinsight/spark/run/SparkBatchJobDebuggerRunner.java [123:347]


    public void execute(final ExecutionEnvironment environment) throws ExecutionException {
        final RunProfileState state = environment.getState();
        if (state == null) {
            return;
        }

        final Operation operation = environment.getUserData(TelemetryKeys.OPERATION);
        final AsyncPromise<ExecutionEnvironment> jobDriverEnvReady = new AsyncPromise<>();
        final SparkBatchRemoteDebugState submissionState = (SparkBatchRemoteDebugState) state;

        final SparkSubmitModel submitModel = submissionState.getSubmitModel();

        // Create SSH debug session firstly
        final SparkBatchDebugSession session;
        try {
            session = SparkBatchDebugSession
                    .factoryByAuth(getSparkJobUrl(submitModel), submitModel.getAdvancedConfigModel())
                    .open()
                    .verifyCertificate();
        } catch (final Exception e) {
            final ExecutionException exp = new ExecutionException("Failed to create SSH session for debugging. "
                                                                          + ExceptionUtils.getRootCauseMessage(e));
            EventUtil.logErrorClassNameOnlyWithComplete(operation, ErrorType.systemError, exp, null, null);
            throw exp;
        }

        final Project project = submitModel.getProject();
        final ExecutionManager executionManager = ExecutionManager.getInstance(project);
        final IdeaSchedulers schedulers = new IdeaSchedulers(project);
        final PublishSubject<SparkBatchJobSubmissionEvent> debugEventSubject = PublishSubject.create();
        final ISparkBatchDebugJob sparkDebugBatch = (ISparkBatchDebugJob) submissionState.getSparkBatch().clone();
        final PublishSubject<SparkLogLine> ctrlSubject =
                (PublishSubject<SparkLogLine>) sparkDebugBatch.getCtrlSubject();
        final SparkBatchJobRemoteDebugProcess driverDebugProcess = new SparkBatchJobRemoteDebugProcess(
                schedulers,
                session,
                sparkDebugBatch,
                submitModel.getArtifactPath().orElseThrow(() -> new ExecutionException("No artifact selected")),
                submitModel.getSubmissionParameter().getMainClassName(),
                submitModel.getAdvancedConfigModel(),
                ctrlSubject);

        final SparkBatchJobDebugProcessHandler driverDebugHandler =
                new SparkBatchJobDebugProcessHandler(project, driverDebugProcess, debugEventSubject);

        // Prepare an independent submission console
        final ConsoleViewImpl submissionConsole = new ConsoleViewImpl(project, true);
        final RunContentDescriptor submissionDesc = new RunContentDescriptor(
                submissionConsole,
                driverDebugHandler,
                submissionConsole.getComponent(),
                String.format("Submit %s to cluster %s",
                              submitModel.getSubmissionParameter().getMainClassName(),
                              submitModel.getSubmissionParameter().getClusterName()));

        // Show the submission console view
        ExecutionManager.getInstance(project).getContentManager().showRunContent(environment.getExecutor(),
                                                                                 submissionDesc);

        // Use the submission console to display the deployment ctrl message
        final Subscription jobSubscription = ctrlSubject.subscribe(typedMessage -> {
            final String line = typedMessage.getRawLog() + "\n";

            switch (typedMessage.getMessageInfoType()) {
                case Error:
                    submissionConsole.print(line, ConsoleViewContentType.ERROR_OUTPUT);
                    break;
                case Info:
                    submissionConsole.print(line, ConsoleViewContentType.NORMAL_OUTPUT);
                    break;
                case Log:
                    submissionConsole.print(line, ConsoleViewContentType.SYSTEM_OUTPUT);
                    break;
                case Warning:
                    submissionConsole.print(line, ConsoleViewContentType.LOG_WARNING_OUTPUT);
                    break;
            }
        }, err -> {
            submissionConsole.print(ExceptionUtils.getRootCauseMessage(err), ConsoleViewContentType.ERROR_OUTPUT);
            final String errMsg = "The Spark job remote debug is cancelled due to "
                    + ExceptionUtils.getRootCauseMessage(err);
            jobDriverEnvReady.setError(errMsg);
            EventUtil.logErrorClassNameOnlyWithComplete(operation,
                                           ErrorType.systemError,
                                           new UncheckedExecutionException(errMsg, err),
                                           null,
                                           null);
        }, () -> {
            if (Optional.ofNullable(driverDebugHandler.getUserData(ProcessHandler.TERMINATION_REQUESTED))
                        .orElse(false)) {
                final String errMsg = "The Spark job remote debug is cancelled by user.";
                jobDriverEnvReady.setError(errMsg);

                final Map<String, String> props = ImmutableMap.of("isDebugCancelled", "true");
                EventUtil.logErrorClassNameOnlyWithComplete(
                        operation, ErrorType.userError, new ExecutionException(errMsg), props, null);
            }
        });

        // Call after completed or error
        debugEventSubject.subscribeOn(Schedulers.io()).doAfterTerminate(session::close).subscribe(debugEvent -> {
            try {
                if (debugEvent instanceof SparkBatchRemoteDebugHandlerReadyEvent) {
                    final SparkBatchRemoteDebugHandlerReadyEvent handlerReadyEvent =
                            (SparkBatchRemoteDebugHandlerReadyEvent) debugEvent;
                    final SparkBatchDebugJobJdbPortForwardedEvent jdbReadyEvent =
                            handlerReadyEvent.getJdbPortForwardedEvent();

                    if (!jdbReadyEvent.getLocalJdbForwardedPort().isPresent()) {
                        return;
                    }

                    final int localPort = jdbReadyEvent.getLocalJdbForwardedPort().get();

                    final ExecutionEnvironment forkEnv = forkEnvironment(
                            environment, jdbReadyEvent.getRemoteHost().orElse("unknown"), jdbReadyEvent.isDriver());

                    final RunProfile runProfile = forkEnv.getRunProfile();
                    if (!(runProfile instanceof LivySparkBatchJobRunConfiguration)) {
                        ctrlSubject.onError(new UnsupportedOperationException(
                                "Only supports LivySparkBatchJobRunConfiguration type, but got type"
                                        + runProfile.getClass().getCanonicalName()));

                        return;
                    }

                    // Reuse the driver's Spark batch job
                    ((LivySparkBatchJobRunConfiguration) runProfile).setSparkRemoteBatch(sparkDebugBatch);

                    final SparkBatchRemoteDebugState forkState = jdbReadyEvent.isDriver()
                                                                 ? submissionState
                                                                 : (SparkBatchRemoteDebugState) forkEnv.getState();

                    if (forkState == null) {
                        return;
                    }

                    // Set the debug connection to localhost and local forwarded port to the state
                    forkState.setRemoteConnection(
                            new RemoteConnection(true, "localhost", Integer.toString(localPort), false));

                    // Prepare the debug tab console view UI
                    SparkJobLogConsoleView jobOutputView = new SparkJobLogConsoleView(project);
                    // Get YARN container log URL port
                    int containerLogUrlPort =
                            ((SparkBatchRemoteDebugJob) driverDebugProcess.getSparkJob())
                                    .getYarnContainerLogUrlPort()
                                    .toBlocking()
                                    .single();
                    // Parse container ID and host URL from driver console view
                    jobOutputView.getSecondaryConsoleView().addMessageFilter((line, entireLength) -> {
                        Matcher matcher = Pattern.compile(
                                "Launching container (\\w+).* on host ([a-zA-Z_0-9-.]+)",
                                Pattern.CASE_INSENSITIVE)
                                                 .matcher(line);
                        while (matcher.find()) {
                            String containerId = matcher.group(1);
                            // TODO: get port from somewhere else rather than hard code here
                            URI hostUri = URI.create(String.format("http://%s:%d",
                                                                   matcher.group(2),
                                                                   containerLogUrlPort));
                            debugEventSubject.onNext(new SparkBatchJobExecutorCreatedEvent(hostUri, containerId));
                        }
                        return null;
                    });
                    jobOutputView.attachToProcess(handlerReadyEvent.getDebugProcessHandler());

                    ExecutionResult result = new DefaultExecutionResult(
                            jobOutputView, handlerReadyEvent.getDebugProcessHandler());
                    forkState.setExecutionResult(result);
                    forkState.setConsoleView(jobOutputView.getSecondaryConsoleView());
                    forkState.setRemoteProcessCtrlLogHandler(handlerReadyEvent.getDebugProcessHandler());

                    if (jdbReadyEvent.isDriver()) {
                        // Let the debug console view to handle the control log
                        jobSubscription.unsubscribe();

                        // Resolve job driver promise, handle the driver VM attaching separately
                        jobDriverEnvReady.setResult(forkEnv);
                    } else {
                        // Start Executor debugging
                        executionManager.startRunProfile(forkEnv, () ->
                                toIdeaPromise(attachAndDebug(forkEnv, forkState)));
                    }
                } else if (debugEvent instanceof SparkBatchJobExecutorCreatedEvent) {
                    SparkBatchJobExecutorCreatedEvent executorCreatedEvent =
                            (SparkBatchJobExecutorCreatedEvent) debugEvent;

                    final String containerId = executorCreatedEvent.getContainerId();
                    final SparkBatchRemoteDebugJob debugJob =
                            (SparkBatchRemoteDebugJob) driverDebugProcess.getSparkJob();

                    URI internalHostUri = executorCreatedEvent.getHostUri();
                    URI executorLogUrl = debugJob.convertToPublicLogUri(internalHostUri)
                                                 .map(uri -> uri.resolve(String.format("node/containerlogs/%s/livy",
                                                                                       containerId)))
                                                 .toBlocking().singleOrDefault(internalHostUri);

                    // Create an Executor Debug Process
                    SparkBatchJobRemoteDebugExecutorProcess executorDebugProcess =
                            new SparkBatchJobRemoteDebugExecutorProcess(
                                    schedulers,
                                    debugJob,
                                    internalHostUri.getHost(),
                                    driverDebugProcess.getDebugSession(),
                                    executorLogUrl.toString());

                    SparkBatchJobDebugProcessHandler executorDebugHandler =
                            new SparkBatchJobDebugProcessHandler(project, executorDebugProcess, debugEventSubject);

                    executorDebugHandler.getRemoteDebugProcess().start();
                }
            } catch (final ExecutionException e) {
                EventUtil.logErrorClassNameOnlyWithComplete(
                        operation, ErrorType.systemError, new UncheckedExecutionException(e), null, null);
                throw new UncheckedExecutionException(e);
            }
        });

        driverDebugHandler.getRemoteDebugProcess().start();

        // Driver side execute, leverage Intellij Async Promise, to wait for the Spark app deployed
        executionManager.startRunProfile(environment, () -> jobDriverEnvReady.thenAsync(driverEnv ->
                toIdeaPromise(attachAndDebug(driverEnv, state))));
    }