in PluginsAndFeatures/azure-toolkit-for-intellij/src/main/java/com/microsoft/azure/cosmosspark/CosmosSparkClusterOpsCtrl.java [62:267]
public CosmosSparkClusterOpsCtrl(@NotNull CosmosSparkClusterOps sparkServerlessClusterOps) {
this.sparkServerlessClusterOps = sparkServerlessClusterOps;
this.sparkServerlessClusterOps.getDestroyAction()
.observeOn(ideSchedulers.dispatchUIThread())
.subscribe(triplet -> {
log().info(String.format("Destroy message received. AdlAccount: %s, cluster: %s, currentNode: %s",
triplet.getLeft().getName(),
// Type cast is necessary for DestroyableCluster
((AzureSparkCosmosCluster) triplet.getMiddle()).getName(),
triplet.getRight().getName()));
CosmosSparkClusterDestoryDialog destroyDialog = new CosmosSparkClusterDestoryDialog(
triplet.getRight(), (AzureSparkCosmosCluster) triplet.getMiddle());
destroyDialog.show();
}, ex -> log().warn(ExceptionUtils.getStackTrace(ex)));
this.sparkServerlessClusterOps.getProvisionAction()
.observeOn(ideSchedulers.dispatchUIThread())
.subscribe(pair -> {
log().info(String.format("Provision message received. AdlAccount: %s, node: %s",
pair.getLeft().getName(), pair.getRight().getName()));
CosmosSparkProvisionDialog provisionDialog = new CosmosSparkProvisionDialog(
pair.getRight(), pair.getLeft());
provisionDialog.show();
}, ex -> log().warn(ExceptionUtils.getStackTrace(ex)));
this.sparkServerlessClusterOps.getMonitorAction()
.observeOn(ideSchedulers.dispatchUIThread())
.subscribe(pair -> {
log().info(String.format("Monitor message received. cluster: %s, node: %s",
pair.getLeft().getName(), pair.getRight().getName()));
CosmosSparkClusterMonitorDialog monitorDialog = new CosmosSparkClusterMonitorDialog(
pair.getRight(), pair.getLeft());
monitorDialog.show();
}, ex -> log().warn(ExceptionUtils.getStackTrace(ex)));
this.sparkServerlessClusterOps.getUpdateAction()
.observeOn(ideSchedulers.dispatchUIThread())
.subscribe(pair -> {
log().info(String.format("Update message received. cluster: %s, node: %s",
pair.getLeft().getName(), pair.getRight().getName()));
CosmosSparkClusterUpdateDialog updateDialog = new CosmosSparkClusterUpdateDialog(
pair.getRight(), pair.getLeft());
updateDialog.show();
}, ex -> log().warn(ExceptionUtils.getStackTrace(ex)));
this.sparkServerlessClusterOps.getSubmitAction()
.observeOn(ideSchedulers.dispatchUIThread())
.subscribe(clusterNodePair -> {
log().info(String.format("Submit message received. cluster: %s, node: %s",
clusterNodePair.getLeft(), clusterNodePair.getRight()));
try {
AzureSparkCosmosCluster cluster = clusterNodePair.getLeft();
SparkAppSubmitContext context = new SparkAppSubmitContext();
Project project = (Project) clusterNodePair.getRight().getProject();
final RunManager runManager = RunManager.getInstance(project);
final List<RunnerAndConfigurationSettings> batchConfigSettings = runManager
.getConfigurationSettingsList(CosmosSparkConfigurationType.INSTANCE);
final String runConfigName = "[Spark on Cosmos] " + cluster.getClusterIdForConfiguration();
final RunnerAndConfigurationSettings runConfigurationSetting = batchConfigSettings.stream()
.filter(settings -> settings.getConfiguration().getName().startsWith(runConfigName))
.findFirst()
.orElseGet(() -> runManager.createRunConfiguration(
runConfigName,
new CosmosSparkConfigurationFactory(CosmosSparkConfigurationType.INSTANCE)));
context.putData(RUN_CONFIGURATION_SETTING, runConfigurationSetting)
.putData(CLUSTER, cluster);
Presentation actionPresentation = new Presentation("Submit Job");
actionPresentation.setDescription("Submit specified Spark application into the remote cluster");
AnActionEvent event = AnActionEvent.createFromDataContext(
String.format("Azure Data Lake Spark pool %s:%s context menu",
cluster.getAccount().getName(), cluster.getName()),
actionPresentation,
context);
new CosmosSparkSelectAndSubmitAction().actionPerformed(event);
} catch (Exception ex) {
log().warn(ex.getMessage());
}
}, ex -> log().warn(ExceptionUtils.getStackTrace(ex)));
this.sparkServerlessClusterOps.getServerlessSubmitAction()
.observeOn(ideSchedulers.dispatchUIThread())
.subscribe(accountNodePair -> {
log().info(String.format("Submit message received. account: %s, node: %s",
accountNodePair.getLeft().getName(), accountNodePair.getRight().getName()));
try {
AzureSparkServerlessAccount adlAccount = accountNodePair.getLeft();
SparkAppSubmitContext context = new SparkAppSubmitContext();
Project project = (Project) accountNodePair.getRight().getProject();
final RunManager runManager = RunManager.getInstance(project);
final List<RunnerAndConfigurationSettings> batchConfigSettings = runManager
.getConfigurationSettingsList(CosmosServerlessSparkConfigurationType.INSTANCE);
final String runConfigName = "[Spark on Cosmos Serverless] " + adlAccount.getName();
final RunnerAndConfigurationSettings runConfigurationSetting = batchConfigSettings.stream()
.filter(settings -> settings.getConfiguration().getName().startsWith(runConfigName))
.findFirst()
.orElseGet(() -> runManager.createRunConfiguration(
runConfigName,
new CosmosServerlessSparkConfigurationFactory(CosmosServerlessSparkConfigurationType.INSTANCE)));
context.putData(RUN_CONFIGURATION_SETTING, runConfigurationSetting)
.putData(CLUSTER, adlAccount);
Presentation actionPresentation = new Presentation("Submit Cosmos Serverless Spark Job");
actionPresentation.setDescription("Submit specified Spark application into the remote cluster");
AnActionEvent event = AnActionEvent.createFromDataContext(
String.format("Cosmos Serverless Cluster %s:%s context menu",
adlAccount.getName(), adlAccount.getName()),
actionPresentation,
context);
new CosmosServerlessSparkSelectAndSubmitAction().actionPerformed(event);
} catch (Exception ex) {
log().warn(ex.getMessage());
}
}, ex -> log().warn(ExceptionUtils.getStackTrace(ex)));
this.sparkServerlessClusterOps.getViewServerlessJobsAction()
.observeOn(ideSchedulers.dispatchUIThread())
.flatMap(accountNodePair -> {
log().info(String.format("View serverless jobs message received. account: %s, node: %s",
accountNodePair.getLeft().getName(), accountNodePair.getRight().getName()));
// check if the requested job list tab exists in tool window
AzureSparkServerlessAccount account = accountNodePair.getLeft();
CosmosSparkADLAccountNode node = accountNodePair.getRight();
ToolWindow toolWindow = ToolWindowManager.getInstance((Project) node.getProject()).getToolWindow("Cosmos Serverless Spark Jobs");
Content existingContent= toolWindow.getContentManager().findContent(getDisplayName(account.getName()));
if (existingContent != null) {
// if the requested job list tab already exists in tool window,
// show the existing job list tab
toolWindow.getContentManager().setSelectedContent(existingContent);
toolWindow.activate(null);
return Observable.empty();
} else {
// create a new tab if the requested job list tab does not exists
return account.getSparkBatchJobList()
.doOnNext(sparkBatchJobList -> {
// show serverless spark job list
CosmosServerlessSparkBatchJobsViewer jobView = new CosmosServerlessSparkBatchJobsViewer(account) {
@Override
public void refreshActionPerformed(@Nullable AnActionEvent anActionEvent) {
Operation operation = TelemetryManager.createOperation(
TelemetryConstants.SPARK_ON_COSMOS_SERVERLESS, TelemetryConstants.REFRESH_JOB_VIEW_TABLE);
operation.start();
account.getSparkBatchJobList()
.doOnNext(jobList -> {
LivyBatchJobViewer.Model refreshedModel =
new LivyBatchJobViewer.Model(
new LivyBatchJobTableViewport.Model(
new LivyBatchJobTableModel(new CosmosServerlessSparkBatchJobsTableSchema()),
getFirstJobPage(account, jobList)),
null
);
this.setData(refreshedModel);
})
.subscribe(
jobList -> {},
ex -> {
log().warn(ExceptionUtils.getStackTrace(ex));
EventUtil.logErrorClassNameOnlyWithComplete(operation, ErrorType.serviceError, ex,
ImmutableMap.of("isRefreshJobsTableSucceed", "false"), null);
},
() -> EventUtil.logEventWithComplete(EventType.info, operation,
ImmutableMap.of("isRefreshJobsTableSucceed", "true"), null)
);
}
};
LivyBatchJobViewer.Model model =
new LivyBatchJobViewer.Model(
new LivyBatchJobTableViewport.Model(
new LivyBatchJobTableModel(new CosmosServerlessSparkBatchJobsTableSchema()),
getFirstJobPage(account, sparkBatchJobList)),
null
);
jobView.setData(model);
ContentFactory contentFactory = ContentFactory.SERVICE.getInstance();
Content content = contentFactory.createContent(jobView.getComponent(), getDisplayName(account.getName()), false);
content.setDisposer(jobView);
toolWindow.getContentManager().addContent(content);
toolWindow.getContentManager().setSelectedContent(content);
toolWindow.activate(null);
});
}
})
.doOnError(err -> {
String errorHint = "Error loading Serverless jobs. ";
log().warn(errorHint + ExceptionUtils.getStackTrace(err));
// show warning message when view serverless jobs failed
PluginUtil.displayWarningDialog("View Apache Spark on Cosmos Serverless Jobs ", errorHint + err.getMessage());
})
// retry should be allowed when error happened
.retry()
.subscribe(jobList -> {}, ex -> log().warn(ExceptionUtils.getStackTrace(ex)));
}