csharp/Microsoft.Azure.Databricks.Client/PipelinesApiClient.cs (201 lines of code) (raw):
using Microsoft.Azure.Databricks.Client.Models;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Text;
using System.Text.Json;
using System.Text.Json.Nodes;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.Azure.Databricks.Client;
public class PipelinesApiClient : ApiClient, IPipelinesApi
{
public PipelinesApiClient(HttpClient httpClient) : base(httpClient)
{
}
public async Task<PipelinesList> List(int maxResults = 25, string pageToken = default, CancellationToken cancellationToken = default)
{
if (maxResults < 1 || maxResults > 100)
{
throw new ArgumentOutOfRangeException(nameof(maxResults), "limit must be between 1 and 100");
}
StringBuilder requestUriSb = new($"{ApiVersion}/pipelines?max_results={maxResults}");
if (!string.IsNullOrEmpty(pageToken))
{
requestUriSb.Append($"&page_token={pageToken}");
}
var requestUri = requestUriSb.ToString();
return await HttpGet<PipelinesList>(this.HttpClient, requestUri, cancellationToken).ConfigureAwait(false);
}
public global::Azure.AsyncPageable<Pipeline> ListPageable(int pageSize = 25, CancellationToken cancellationToken = default)
{
return new AsyncPageable<Pipeline>(
async (string pageToken) =>
{
var response = await List(pageSize, pageToken, cancellationToken).ConfigureAwait(false);
return (response.Pipelines.ToList(), response.HasMore, response.NextPageToken);
}
);
}
public async Task<(string, PipelineSpecification)> Create(
PipelineSpecification pipelineSpecification,
bool dryRun = true,
bool allowDuplicateNames = false,
CancellationToken cancellationToken = default)
{
var requestUri = $"{ApiVersion}/pipelines";
var request = JsonSerializer.SerializeToNode(pipelineSpecification, Options)!.AsObject();
request.Add("dry_run", dryRun);
request.Add("allow_duplicate_names", allowDuplicateNames);
var response = await HttpPost<JsonObject, JsonObject>
(this.HttpClient, requestUri, request, cancellationToken)
.ConfigureAwait(false);
return (
response["pipeline_id"]?.GetValue<string>(),
response["effective_settings"]?.AsObject().Deserialize<PipelineSpecification>(Options)
);
}
public async Task Edit(
string pipelineId,
PipelineSpecification pipelineSpecification,
bool allowDuplicateNames = false,
DateTimeOffset? expectedLastModified = null,
CancellationToken cancellationToken = default)
{
var requestUri = $"{ApiVersion}/pipelines/{pipelineId}";
var request = JsonSerializer.SerializeToNode(pipelineSpecification, Options)!.AsObject();
request.Add("allow_duplicate_names", allowDuplicateNames);
if (expectedLastModified != null)
{
request.Add("expected_last_modified", expectedLastModified);
}
await HttpPut(this.HttpClient, requestUri, request, cancellationToken).ConfigureAwait(false);
}
public async Task Delete(string pipelineId, CancellationToken cancellationToken = default)
{
var requestUri = $"{ApiVersion}/pipelines/{pipelineId}";
await HttpDelete(this.HttpClient, requestUri, cancellationToken).ConfigureAwait(false);
}
public async Task Reset(string pipelineId, CancellationToken cancellationToken = default)
{
var requestUri = $"{ApiVersion}/pipelines/{pipelineId}/reset";
await HttpPost(this.HttpClient, requestUri, new { }, cancellationToken);
}
public async Task Stop(string pipelineId, CancellationToken cancellationToken = default)
{
var requestUri = $"{ApiVersion}/pipelines/{pipelineId}/stop";
await HttpPost(this.HttpClient, requestUri, new { }, cancellationToken);
}
public async Task<Pipeline> Get(string pipelineId, CancellationToken cancellationToken = default)
{
var requestUri = $"{ApiVersion}/pipelines/{pipelineId}";
return await HttpGet<Pipeline>(this.HttpClient, requestUri, cancellationToken).ConfigureAwait(false);
}
public async Task<PipelineUpdate> GetUpdate(string pipelineId, string updateId, CancellationToken cancellationToken = default)
{
var requestUri = $"{ApiVersion}/pipelines/{pipelineId}/updates/{updateId}";
var response = await HttpGet<Dictionary<string, PipelineUpdate>>(this.HttpClient, requestUri, cancellationToken).ConfigureAwait(false);
return response["update"];
}
public async Task<PipelineUpdatesList> ListUpdates(
string pipelineId,
int maxResults = 25,
string pageToken = null,
string untilUpdateId = null,
CancellationToken cancellationToken = default)
{
var requestUriSb = new StringBuilder($"{ApiVersion}/pipelines/{pipelineId}/updates?max_results={maxResults}");
if (pageToken != null)
{
requestUriSb.Append($"&page_token={pageToken}");
}
if (untilUpdateId != null)
{
requestUriSb.Append($"&until_update_id={untilUpdateId}");
}
var requestUri = requestUriSb.ToString();
return await HttpGet<PipelineUpdatesList>(this.HttpClient, requestUri, cancellationToken).ConfigureAwait(false);
}
public global::Azure.AsyncPageable<PipelineUpdate> ListUpdatesPageable(
string pipelineId,
int pageSize = 25,
string untilUpdateId = null,
CancellationToken cancellationToken = default)
{
return new AsyncPageable<PipelineUpdate>(
async (string pageToken) =>
{
var response = await ListUpdates(pipelineId, pageSize, pageToken, untilUpdateId, cancellationToken).ConfigureAwait(false);
return (response.Updates.ToList(), response.HasMore, response.NextPageToken);
}
);
}
public async Task<string> Start(
string pipelineId,
bool fullRefresh = false,
PipelineUpdateCause cause = PipelineUpdateCause.API_CALL,
IEnumerable<string> refreshSelection = default,
IEnumerable<string> fullRefreshSelection = default,
CancellationToken cancellationToken = default)
{
var requestUri = $"{ApiVersion}/pipelines/{pipelineId}/updates";
var requestDict = new Dictionary<string, string>()
{
{ "full_refresh", fullRefresh.ToString().ToLower() },
{ "cause", cause.ToString() }
};
var request = JsonSerializer.SerializeToNode(requestDict, Options).AsObject();
if (refreshSelection != null)
{
var refreshSelectionJson = JsonSerializer.SerializeToNode(refreshSelection, Options);
request.Add("refresh_selection", refreshSelectionJson);
}
if (fullRefreshSelection != null)
{
var fullRefreshSelectionJson = JsonSerializer.SerializeToNode(fullRefreshSelection, Options);
request.Add("full_refresh_selection", fullRefreshSelectionJson);
}
var response = await HttpPost<JsonObject, JsonObject>(this.HttpClient, requestUri, request, cancellationToken).ConfigureAwait(false);
return response["update_id"].GetValue<string>();
}
public async Task<PipelineEventsList> ListEvents(
string pipelineId,
int maxResults = 25,
string orderBy = default,
string filter = default,
string pageToken = default,
CancellationToken cancellationToken = default)
{
var requestUriSb = new StringBuilder($"{ApiVersion}/pipelines/{pipelineId}/events?max_results={maxResults}");
if (orderBy != null)
{
requestUriSb.Append($"&order_by={orderBy}");
}
if (filter != null)
{
requestUriSb.Append($"&filter={filter}");
}
if (pageToken != null)
{
requestUriSb.Append($"&page_token={pageToken}");
}
var requestUri = requestUriSb.ToString();
return await HttpGet<PipelineEventsList>(this.HttpClient, requestUri, cancellationToken).ConfigureAwait(false);
}
public global::Azure.AsyncPageable<PipelineEvent> ListEventsPageable(
string pipelineId,
int pageSize = 25,
string orderBy = default,
string filter = default,
CancellationToken cancellationToken = default)
{
return new AsyncPageable<PipelineEvent>(
async (string pageToken) =>
{
var response = await ListEvents(pipelineId, pageSize, orderBy, filter, pageToken, cancellationToken).ConfigureAwait(false);
return (response.Events.ToList(), response.HasMore, response.NextPageToken);
}
);
}
}