csharp/Microsoft.Azure.Databricks.Client/JobsApiClient.cs (246 lines of code) (raw):

// Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. using Microsoft.Azure.Databricks.Client.Models; using System; using System.Collections.Generic; using System.Linq; using System.Net.Http; using System.Text; using System.Text.Json; using System.Text.Json.Nodes; using System.Threading; using System.Threading.Tasks; namespace Microsoft.Azure.Databricks.Client; public class JobsApiClient : ApiClient, IJobsApi { protected override string ApiVersion => "2.1"; public JobsApiClient(HttpClient httpClient) : base(httpClient) { } public async Task<long> Create(JobSettings jobSettings, IEnumerable<AclPermissionItem> accessControlList = default, CancellationToken cancellationToken = default) { var request = JsonSerializer.SerializeToNode(jobSettings, Options)!.AsObject(); if (accessControlList != null) { request.Add("access_control_list", JsonSerializer.SerializeToNode(accessControlList, Options)); } var jobIdentifier = await HttpPost<JsonObject, JsonObject>(this.HttpClient, $"{ApiVersion}/jobs/create", request, cancellationToken) .ConfigureAwait(false); return jobIdentifier["job_id"]!.GetValue<long>(); } private string BuildJobsListUrl(int limit, string name, bool expandTasks) { StringBuilder url = new($"{ApiVersion}/jobs/list?limit={limit}"); if (name is not null) { url.Append($"&name={name}"); } url.Append($"&expand_tasks={expandTasks.ToString().ToLowerInvariant()}"); return url.ToString(); } [Obsolete("The offset parameter is deprecated. Use method with pageToken to iterate through the pages.")] public async Task<JobList> List(int limit = 20, int offset = 0, string name = default, bool expandTasks = false, CancellationToken cancellationToken = default) { if (limit < 1 || limit > 25) { throw new ArgumentOutOfRangeException(nameof(limit), "limit must be between 1 and 25"); } if (offset < 0) { throw new ArgumentOutOfRangeException(nameof(offset), "offset must be greater than or equal to 0"); } var url = BuildJobsListUrl(limit, name, expandTasks); url += $"&offset={offset}"; var response = await HttpGet<JsonObject>(this.HttpClient, url, cancellationToken) .ConfigureAwait(false); response.TryGetPropertyValue("jobs", out var jobsNode); var jobs = jobsNode?.Deserialize<IEnumerable<Job>>(Options) ?? Enumerable.Empty<Job>(); var hasMore = response.TryGetPropertyValue("has_more", out var hasMoreNode) && hasMoreNode!.GetValue<bool>(); return new JobList { Jobs = jobs, HasMore = hasMore }; } public global::Azure.AsyncPageable<Job> ListPageable(int pageSize = 20, string name = null, bool expandTasks = false, CancellationToken cancellationToken = default) { if (pageSize < 1 || pageSize > 25) { throw new ArgumentOutOfRangeException(nameof(pageSize), "pageSize must be between 1 and 25"); } return new AsyncPageable<Job>(async (nextPageToken) => { var url = BuildJobsListUrl(pageSize, name, expandTasks); url += string.IsNullOrEmpty(nextPageToken) ? string.Empty : $"&page_token={nextPageToken}"; var jobList = await HttpGet<JobList>(this.HttpClient, url, cancellationToken).ConfigureAwait(false); return (jobList.Jobs.ToList(), jobList.HasMore, jobList.NextPageToken); }); } public async Task Delete(long jobId, CancellationToken cancellationToken = default) { await HttpPost(this.HttpClient, $"{ApiVersion}/jobs/delete", new { job_id = jobId }, cancellationToken).ConfigureAwait(false); } public async Task<Job> Get(long jobId, CancellationToken cancellationToken = default) { var requestUri = $"{ApiVersion}/jobs/get?job_id={jobId}"; return await HttpGet<Job>(this.HttpClient, requestUri, cancellationToken).ConfigureAwait(false); } public async Task Reset(long jobId, JobSettings newSettings, CancellationToken cancellationToken = default) { await HttpPost(this.HttpClient, $"{ApiVersion}/jobs/reset", new { job_id = jobId, new_settings = newSettings }, cancellationToken) .ConfigureAwait(false); } public async Task Update(long jobId, JobSettings newSettings, string[] fieldsToRemove = default, CancellationToken cancellationToken = default) { await HttpPost(this.HttpClient, $"{ApiVersion}/jobs/update", new { job_id = jobId, new_settings = newSettings, fields_to_remove = fieldsToRemove }, cancellationToken) .ConfigureAwait(false); } public async Task<long> RunNow(long jobId, RunParameters runParams = default, string idempotencyToken = default, QueueSettings queueSettings = default, CancellationToken cancellationToken = default) { var request = runParams == null ? new JsonObject() : JsonSerializer.SerializeToNode(runParams, Options)!.AsObject(); request.Add("job_id", jobId); if (queueSettings != null) { request.Add("queue", JsonSerializer.SerializeToNode(queueSettings)); } if (!string.IsNullOrEmpty(idempotencyToken)) { request.Add("idempotency_token", idempotencyToken); } var result = await HttpPost<JsonObject, RunIdentifier>( this.HttpClient, $"{ApiVersion}/jobs/run-now", request, cancellationToken ).ConfigureAwait(false); return result.RunId; } public async Task<long> RunSubmit(RunSubmitSettings settings, IEnumerable<AclPermissionItem> accessControlList = default, string idempotencyToken = default, CancellationToken cancellationToken = default) { var request = JsonSerializer.SerializeToNode(settings, Options)!.AsObject(); if (!string.IsNullOrEmpty(idempotencyToken)) { request.Add("idempotency_token", idempotencyToken); } if (accessControlList != null) { request.Add("access_control_list", JsonSerializer.SerializeToNode(accessControlList, Options)); } var result = await HttpPost<JsonObject, RunIdentifier>( this.HttpClient, $"{ApiVersion}/jobs/runs/submit", request, cancellationToken ).ConfigureAwait(false); return result.RunId; } private string BuildRunsListUrl(long? jobId = default, int limit = 25, bool activeOnly = default, bool completedOnly = default, RunType? runType = default, bool expandTasks = default, DateTimeOffset? startTimeFrom = default, DateTimeOffset? startTimeTo = default) { if (activeOnly && completedOnly) { throw new ArgumentException( $"{nameof(activeOnly)} and {nameof(completedOnly)} cannot both be true." ); } StringBuilder url = new($"{ApiVersion}/jobs/runs/list?limit={limit}"); if (jobId.HasValue) { url.Append($"&job_id={jobId.Value}"); } url.Append(activeOnly ? "&active_only=true" : string.Empty); url.Append(completedOnly ? "&completed_only=true" : string.Empty); if (runType.HasValue) { url.Append($"&run_type={runType.Value}"); } url.Append(expandTasks ? "&expand_tasks=true" : string.Empty); if (startTimeFrom.HasValue) { url.Append($"&start_time_from={startTimeFrom.Value.ToUnixTimeMilliseconds()}"); } if (startTimeTo.HasValue) { url.Append($"&start_time_to={startTimeTo.Value.ToUnixTimeMilliseconds()}"); } return url.ToString(); } [Obsolete("The offset parameter is deprecated. Use method with pageToken to iterate through the pages.")] public async Task<RunList> RunsList(long? jobId = default, int offset = 0, int limit = 25, bool activeOnly = default, bool completedOnly = default, RunType? runType = default, bool expandTasks = default, DateTimeOffset? startTimeFrom = default, DateTimeOffset? startTimeTo = default, CancellationToken cancellationToken = default) { string url = BuildRunsListUrl(jobId, limit, activeOnly, completedOnly, runType, expandTasks, startTimeFrom, startTimeTo); url += $"&offset={offset}"; return await HttpGet<RunList>(this.HttpClient, url, cancellationToken).ConfigureAwait(false); } public async Task<RunList> RunsList(string pageToken, long? jobId = default, int limit = 25, bool activeOnly = default, bool completedOnly = default, RunType? runType = default, bool expandTasks = default, DateTimeOffset? startTimeFrom = default, DateTimeOffset? startTimeTo = default, CancellationToken cancellationToken = default) { string url = BuildRunsListUrl(jobId, limit, activeOnly, completedOnly, runType, expandTasks, startTimeFrom, startTimeTo); url += string.IsNullOrEmpty(pageToken) ? string.Empty : $"&page_token={pageToken}"; return await HttpGet<RunList>(this.HttpClient, url, cancellationToken).ConfigureAwait(false); } public global::Azure.AsyncPageable<Run> RunsListPageable(long? jobId = null, int pageSize = 25, bool activeOnly = false, bool completedOnly = false, RunType? runType = null, bool expandTasks = false, DateTimeOffset? startTimeFrom = null, DateTimeOffset? startTimeTo = null, CancellationToken cancellationToken = default) { return new AsyncPageable<Run>(async (nextPageToken) => { var response = await RunsList(nextPageToken, jobId, pageSize, activeOnly, completedOnly, runType, expandTasks, startTimeFrom, startTimeTo, cancellationToken).ConfigureAwait(false); return (response.Runs.ToList(), response.HasMore, response.NextPageToken); }); } public async Task<(Run, RepairHistory)> RunsGet(long runId, bool includeHistory = default, bool includeResolvedValues = default, CancellationToken cancellationToken = default) { var url = $"{ApiVersion}/jobs/runs/get?run_id={runId}&include_history={JsonValue.Create(includeHistory)}&include_resolved_values={JsonValue.Create(includeResolvedValues)}"; var response = await HttpGet<JsonObject>(this.HttpClient, url, cancellationToken).ConfigureAwait(false); return (response.Deserialize<Run>(Options), response.Deserialize<RepairHistory>(Options)); } public async Task RunsCancel(long runId, CancellationToken cancellationToken = default) { var request = new { run_id = runId }; await HttpPost(this.HttpClient, $"{ApiVersion}/jobs/runs/cancel", request, cancellationToken).ConfigureAwait(false); } public async Task RunsDelete(long runId, CancellationToken cancellationToken = default) { var request = new { run_id = runId }; await HttpPost(this.HttpClient, $"{ApiVersion}/jobs/runs/delete", request, cancellationToken).ConfigureAwait(false); } public async Task<IEnumerable<ViewItem>> RunsExport(long runId, ViewsToExport viewsToExport = ViewsToExport.CODE, CancellationToken cancellationToken = default) { var url = $"{ApiVersion}/jobs/runs/export?run_id={runId}&views_to_export={viewsToExport}"; var viewItemList = await HttpGet<JsonObject>(this.HttpClient, url, cancellationToken).ConfigureAwait(false); return viewItemList.TryGetPropertyValue("views", out var views) ? views!.Deserialize<IEnumerable<ViewItem>>(Options) : Enumerable.Empty<ViewItem>(); } public async Task<RunOutput> RunsGetOutput(long runId, CancellationToken cancellationToken = default) { var url = $"{ApiVersion}/jobs/runs/get-output?run_id={runId}"; return await HttpGet<RunOutput>(this.HttpClient, url, cancellationToken).ConfigureAwait(false); } public async Task<long> RunsRepair(RepairRunInput repairRunInput, RunParameters repairRunParameters, CancellationToken cancellationToken = default) { var url = $"{ApiVersion}/jobs/runs/repair"; var inputNode = JsonSerializer.SerializeToNode(repairRunInput, Options)!.AsObject(); var parametersNode = JsonSerializer.SerializeToNode(repairRunParameters, Options)!.AsObject(); foreach (var kvp in parametersNode) { if (kvp.Value == null) continue; var node = kvp.Value!.ToJsonString(Options); inputNode.Add(kvp.Key, JsonNode.Parse(node)); } var response = await HttpPost<JsonObject, JsonObject>(this.HttpClient, url, inputNode, cancellationToken) .ConfigureAwait(false); return response["repair_id"]!.GetValue<long>(); } }