csharp/Microsoft.Azure.Databricks.Client.Sample/SampleProgram.Jobs.cs (121 lines of code) (raw):
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
using Microsoft.Azure.Databricks.Client.Models;
using Polly;
using System.Net;
using Policy = Polly.Policy;
namespace Microsoft.Azure.Databricks.Client.Sample;
internal static partial class SampleProgram
{
private static async Task TestJobsApi(DatabricksClient client)
{
Console.WriteLine($"Creating workspace {SampleWorkspacePath}");
await client.Workspace.Mkdirs(SampleWorkspacePath);
Console.WriteLine("Downloading sample notebook");
var content = await DownloadSampleNotebook();
Console.WriteLine($"Importing sample HTML notebook to {SampleNotebookPath}");
await client.Workspace.Import(SampleNotebookPath, ExportFormat.HTML, null,
content, true);
var schedule = new CronSchedule
{
QuartzCronExpression = "0 0 9 ? * MON-FRI",
TimezoneId = "Europe/London",
PauseStatus = PauseStatus.UNPAUSED
};
var newCluster = ClusterAttributes.GetNewClusterConfiguration()
.WithClusterMode(ClusterMode.SingleNode)
.WithNodeType(NodeTypes.Standard_D3_v2)
.WithRuntimeVersion(RuntimeVersions.Runtime_10_4);
var jobSettings = new JobSettings
{
MaxConcurrentRuns = 1,
Schedule = schedule,
Name = "Sample Job"
};
var task1 = jobSettings.AddTask("task1", new NotebookTask { NotebookPath = SampleNotebookPath })
.WithDescription("Sample Job - task1")
.WithNewCluster(newCluster);
var task2 = jobSettings.AddTask("task2", new NotebookTask { NotebookPath = SampleNotebookPath })
.WithDescription("Sample Job - task2")
.WithNewCluster(newCluster);
jobSettings.AddTask("task3", new NotebookTask { NotebookPath = SampleNotebookPath }, new[] { task1, task2 })
.WithDescription("Sample Job - task3")
.WithNewCluster(newCluster);
Console.WriteLine("Creating new job");
var jobId = await client.Jobs.Create(jobSettings);
Console.WriteLine("Job created: {0}", jobId);
// Adding email notifications.
await client.Jobs.Update(jobId, new JobSettings
{
EmailNotifications = new JobEmailNotifications
{
OnSuccess = ["someone@example.com"]
}
});
// Removing email notifications and libraries.
await client.Jobs.Update(jobId, null, ["email_notifications"]);
// Reset job by pausing schedule and attaching libraries to each task.
var jobInfo = await client.Jobs.Get(jobId);
jobInfo.Settings.Schedule.PauseStatus = PauseStatus.PAUSED;
foreach (var task in jobSettings.Tasks)
{
task.AttachLibrary(
new MavenLibrary
{
MavenLibrarySpec = new MavenLibrarySpec { Coordinates = "com.microsoft.azure:synapseml_2.12:0.9.5" }
}
);
}
Console.WriteLine("Resetting job");
await client.Jobs.Reset(jobId, jobInfo.Settings);
Console.WriteLine("Run now: {0}", jobId);
var runId = await client.Jobs.RunNow(jobId);
Console.WriteLine("Run Id: {0}", runId);
await WaitForRun(client.Jobs, runId);
Console.WriteLine($"Exporting tasks from run {runId}");
var (run, _) = await client.Jobs.RunsGet(runId);
foreach (var runTask in run.Tasks)
{
var viewItems = await client.Jobs.RunsExport(runTask.RunId);
foreach (var viewItem in viewItems)
{
Console.WriteLine($"Exported view item from run {runTask.RunId}, task \"{runTask.TaskKey}\", view \"{viewItem.Name}\"");
Console.WriteLine("====================");
Console.WriteLine(viewItem.Content[..200] + "...");
Console.WriteLine("====================");
}
}
Console.WriteLine($"Deleting run {runId}");
await client.Jobs.RunsDelete(runId);
Console.WriteLine($"Deleting job {jobId}");
await client.Jobs.Delete(jobId);
Console.WriteLine("Deleting sample workspace");
await client.Workspace.Delete(SampleWorkspacePath, true);
}
#pragma warning disable CS0618 // Type or member is obsolete
private static async Task WaitForRun(IJobsApi jobClient, long runId, int pollIntervalSeconds = 15)
{
var retryPolicy = Policy.Handle<WebException>()
.Or<ClientApiException>(e => e.StatusCode == HttpStatusCode.BadGateway)
.Or<ClientApiException>(e => e.StatusCode == HttpStatusCode.InternalServerError)
.Or<ClientApiException>(e => e.Message.Contains("\"error_code\":\"TEMPORARILY_UNAVAILABLE\""))
.Or<TaskCanceledException>(e => !e.CancellationToken.IsCancellationRequested) // web request timeout
.OrResult<RunState>(state =>
state.LifeCycleState is RunLifeCycleState.PENDING or RunLifeCycleState.RUNNING
or RunLifeCycleState.TERMINATING)
.WaitAndRetryForeverAsync(
_ => TimeSpan.FromSeconds(pollIntervalSeconds),
(delegateResult, _) =>
{
if (delegateResult.Exception != null)
{
Console.WriteLine(
$"[{DateTime.UtcNow:s}] Failed to query run - {delegateResult.Exception}");
}
});
await retryPolicy.ExecuteAsync(async () =>
{
var (run, _) = await jobClient.RunsGet(runId);
Console.WriteLine(
$"[{DateTime.UtcNow:s}]Run:{runId}\tLifeCycleState:{run.State.LifeCycleState}\tResultState:{run.State.ResultState}\tCompleted:{run.IsCompleted}"
);
return run.State;
});
}
}