public CosmosSparkClusterOpsCtrl()

in PluginsAndFeatures/azure-toolkit-for-intellij/src/main/java/com/microsoft/azure/cosmosspark/CosmosSparkClusterOpsCtrl.java [62:267]


    public CosmosSparkClusterOpsCtrl(@NotNull CosmosSparkClusterOps sparkServerlessClusterOps) {
        this.sparkServerlessClusterOps = sparkServerlessClusterOps;

        this.sparkServerlessClusterOps.getDestroyAction()
                .observeOn(ideSchedulers.dispatchUIThread())
                .subscribe(triplet -> {
                    log().info(String.format("Destroy message received. AdlAccount: %s, cluster: %s, currentNode: %s",
                            triplet.getLeft().getName(),
                            // Type cast is necessary for DestroyableCluster
                            ((AzureSparkCosmosCluster) triplet.getMiddle()).getName(),
                            triplet.getRight().getName()));
                    CosmosSparkClusterDestoryDialog destroyDialog = new CosmosSparkClusterDestoryDialog(
                            triplet.getRight(), (AzureSparkCosmosCluster) triplet.getMiddle());
                    destroyDialog.show();
                }, ex -> log().warn(ExceptionUtils.getStackTrace(ex)));

        this.sparkServerlessClusterOps.getProvisionAction()
                .observeOn(ideSchedulers.dispatchUIThread())
                .subscribe(pair -> {
                    log().info(String.format("Provision message received. AdlAccount: %s, node: %s",
                            pair.getLeft().getName(), pair.getRight().getName()));
                    CosmosSparkProvisionDialog provisionDialog = new CosmosSparkProvisionDialog(
                            pair.getRight(), pair.getLeft());
                    provisionDialog.show();
                }, ex -> log().warn(ExceptionUtils.getStackTrace(ex)));

        this.sparkServerlessClusterOps.getMonitorAction()
                .observeOn(ideSchedulers.dispatchUIThread())
                .subscribe(pair -> {
                    log().info(String.format("Monitor message received. cluster: %s, node: %s",
                            pair.getLeft().getName(), pair.getRight().getName()));
                    CosmosSparkClusterMonitorDialog monitorDialog = new CosmosSparkClusterMonitorDialog(
                            pair.getRight(), pair.getLeft());
                    monitorDialog.show();
                }, ex -> log().warn(ExceptionUtils.getStackTrace(ex)));

        this.sparkServerlessClusterOps.getUpdateAction()
                .observeOn(ideSchedulers.dispatchUIThread())
                .subscribe(pair -> {
                    log().info(String.format("Update message received. cluster: %s, node: %s",
                            pair.getLeft().getName(), pair.getRight().getName()));
                    CosmosSparkClusterUpdateDialog updateDialog = new CosmosSparkClusterUpdateDialog(
                            pair.getRight(), pair.getLeft());
                    updateDialog.show();
                }, ex -> log().warn(ExceptionUtils.getStackTrace(ex)));

        this.sparkServerlessClusterOps.getSubmitAction()
                .observeOn(ideSchedulers.dispatchUIThread())
                .subscribe(clusterNodePair -> {
                    log().info(String.format("Submit message received. cluster: %s, node: %s",
                            clusterNodePair.getLeft(), clusterNodePair.getRight()));

                    try {
                        AzureSparkCosmosCluster cluster = clusterNodePair.getLeft();
                        SparkAppSubmitContext context = new SparkAppSubmitContext();
                        Project project = (Project) clusterNodePair.getRight().getProject();

                        final RunManager runManager = RunManager.getInstance(project);
                        final List<RunnerAndConfigurationSettings> batchConfigSettings = runManager
                                .getConfigurationSettingsList(CosmosSparkConfigurationType.INSTANCE);

                        final String runConfigName = "[Spark on Cosmos] " + cluster.getClusterIdForConfiguration();
                        final RunnerAndConfigurationSettings runConfigurationSetting = batchConfigSettings.stream()
                                .filter(settings -> settings.getConfiguration().getName().startsWith(runConfigName))
                                .findFirst()
                                .orElseGet(() -> runManager.createRunConfiguration(
                                        runConfigName,
                                        new CosmosSparkConfigurationFactory(CosmosSparkConfigurationType.INSTANCE)));

                        context.putData(RUN_CONFIGURATION_SETTING, runConfigurationSetting)
                                .putData(CLUSTER, cluster);

                        Presentation actionPresentation = new Presentation("Submit Job");
                        actionPresentation.setDescription("Submit specified Spark application into the remote cluster");

                        AnActionEvent event = AnActionEvent.createFromDataContext(
                                String.format("Azure Data Lake Spark pool %s:%s context menu",
                                        cluster.getAccount().getName(), cluster.getName()),
                                actionPresentation,
                                context);

                        new CosmosSparkSelectAndSubmitAction().actionPerformed(event);
                    } catch (Exception ex) {
                        log().warn(ex.getMessage());
                    }
                }, ex -> log().warn(ExceptionUtils.getStackTrace(ex)));

        this.sparkServerlessClusterOps.getServerlessSubmitAction()
                .observeOn(ideSchedulers.dispatchUIThread())
                .subscribe(accountNodePair -> {
                    log().info(String.format("Submit message received. account: %s, node: %s",
                            accountNodePair.getLeft().getName(), accountNodePair.getRight().getName()));

                    try {
                        AzureSparkServerlessAccount adlAccount = accountNodePair.getLeft();
                        SparkAppSubmitContext context = new SparkAppSubmitContext();
                        Project project = (Project) accountNodePair.getRight().getProject();

                        final RunManager runManager = RunManager.getInstance(project);
                        final List<RunnerAndConfigurationSettings> batchConfigSettings = runManager
                                .getConfigurationSettingsList(CosmosServerlessSparkConfigurationType.INSTANCE);

                        final String runConfigName = "[Spark on Cosmos Serverless] " + adlAccount.getName();
                        final RunnerAndConfigurationSettings runConfigurationSetting = batchConfigSettings.stream()
                                .filter(settings -> settings.getConfiguration().getName().startsWith(runConfigName))
                                .findFirst()
                                .orElseGet(() -> runManager.createRunConfiguration(
                                        runConfigName,
                                        new CosmosServerlessSparkConfigurationFactory(CosmosServerlessSparkConfigurationType.INSTANCE)));

                        context.putData(RUN_CONFIGURATION_SETTING, runConfigurationSetting)
                                .putData(CLUSTER, adlAccount);

                        Presentation actionPresentation = new Presentation("Submit Cosmos Serverless Spark Job");
                        actionPresentation.setDescription("Submit specified Spark application into the remote cluster");

                        AnActionEvent event = AnActionEvent.createFromDataContext(
                                String.format("Cosmos Serverless Cluster %s:%s context menu",
                                        adlAccount.getName(), adlAccount.getName()),
                                actionPresentation,
                                context);

                        new CosmosServerlessSparkSelectAndSubmitAction().actionPerformed(event);
                    } catch (Exception ex) {
                        log().warn(ex.getMessage());
                    }
                }, ex -> log().warn(ExceptionUtils.getStackTrace(ex)));

        this.sparkServerlessClusterOps.getViewServerlessJobsAction()
                .observeOn(ideSchedulers.dispatchUIThread())
                .flatMap(accountNodePair -> {
                    log().info(String.format("View serverless jobs message received. account: %s, node: %s",
                            accountNodePair.getLeft().getName(), accountNodePair.getRight().getName()));
                    // check if the requested job list tab exists in tool window
                    AzureSparkServerlessAccount account = accountNodePair.getLeft();
                    CosmosSparkADLAccountNode node = accountNodePair.getRight();
                    ToolWindow toolWindow = ToolWindowManager.getInstance((Project) node.getProject()).getToolWindow("Cosmos Serverless Spark Jobs");
                    Content existingContent= toolWindow.getContentManager().findContent(getDisplayName(account.getName()));
                    if (existingContent != null) {
                        // if the requested job list tab already exists in tool window,
                        // show the existing job list tab
                        toolWindow.getContentManager().setSelectedContent(existingContent);
                        toolWindow.activate(null);
                        return Observable.empty();
                    } else {
                        // create a new tab if the requested job list tab does not exists
                        return account.getSparkBatchJobList()
                                .doOnNext(sparkBatchJobList -> {
                                    // show serverless spark job list
                                    CosmosServerlessSparkBatchJobsViewer jobView = new CosmosServerlessSparkBatchJobsViewer(account) {
                                        @Override
                                        public void refreshActionPerformed(@Nullable AnActionEvent anActionEvent) {
                                            Operation operation = TelemetryManager.createOperation(
                                                    TelemetryConstants.SPARK_ON_COSMOS_SERVERLESS, TelemetryConstants.REFRESH_JOB_VIEW_TABLE);
                                            operation.start();
                                            account.getSparkBatchJobList()
                                                    .doOnNext(jobList -> {
                                                        LivyBatchJobViewer.Model refreshedModel =
                                                                new LivyBatchJobViewer.Model(
                                                                        new LivyBatchJobTableViewport.Model(
                                                                                new LivyBatchJobTableModel(new CosmosServerlessSparkBatchJobsTableSchema()),
                                                                                getFirstJobPage(account, jobList)),
                                                                        null
                                                                );
                                                        this.setData(refreshedModel);
                                                    })
                                                    .subscribe(
                                                            jobList -> {},
                                                            ex -> {
                                                                log().warn(ExceptionUtils.getStackTrace(ex));
                                                                EventUtil.logErrorClassNameOnlyWithComplete(operation, ErrorType.serviceError, ex,
                                                                        ImmutableMap.of("isRefreshJobsTableSucceed", "false"), null);
                                                            },
                                                            () -> EventUtil.logEventWithComplete(EventType.info, operation,
                                                                        ImmutableMap.of("isRefreshJobsTableSucceed", "true"), null)
                                                    );
                                        }
                                    };
                                    LivyBatchJobViewer.Model model =
                                            new LivyBatchJobViewer.Model(
                                                    new LivyBatchJobTableViewport.Model(
                                                            new LivyBatchJobTableModel(new CosmosServerlessSparkBatchJobsTableSchema()),
                                                            getFirstJobPage(account, sparkBatchJobList)),
                                                    null
                                            );
                                    jobView.setData(model);

                                    ContentFactory contentFactory = ContentFactory.SERVICE.getInstance();
                                    Content content = contentFactory.createContent(jobView.getComponent(), getDisplayName(account.getName()), false);
                                    content.setDisposer(jobView);
                                    toolWindow.getContentManager().addContent(content);
                                    toolWindow.getContentManager().setSelectedContent(content);
                                    toolWindow.activate(null);
                                });
                    }
                })
                .doOnError(err -> {
                    String errorHint = "Error loading Serverless jobs. ";
                    log().warn(errorHint + ExceptionUtils.getStackTrace(err));
                    // show warning message when view serverless jobs failed
                    PluginUtil.displayWarningDialog("View Apache Spark on Cosmos Serverless Jobs ", errorHint + err.getMessage());
                })
                // retry should be allowed when error happened
                .retry()
                .subscribe(jobList -> {}, ex -> log().warn(ExceptionUtils.getStackTrace(ex)));
    }