csharp/Microsoft.Azure.Databricks.Client.Sample/SampleProgram.Cluster.cs (135 lines of code) (raw):
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
using Microsoft.Azure.Databricks.Client.Models;
using Polly;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text.Json;
using System.Threading.Tasks;
using Policy = Polly.Policy;
namespace Microsoft.Azure.Databricks.Client.Sample;
internal static partial class SampleProgram
{
private static async Task WaitForCluster(IClustersApi clusterClient, string clusterId, int pollIntervalSeconds = 15)
{
var retryPolicy = Policy.Handle<WebException>()
.Or<ClientApiException>(e => e.StatusCode == HttpStatusCode.BadGateway)
.Or<ClientApiException>(e => e.StatusCode == HttpStatusCode.InternalServerError)
.Or<ClientApiException>(e => e.Message.Contains("\"error_code\":\"TEMPORARILY_UNAVAILABLE\""))
.Or<TaskCanceledException>(e => !e.CancellationToken.IsCancellationRequested) // web request timeout
.OrResult<ClusterInfo>(info => info.State is not (ClusterState.RUNNING or ClusterState.ERROR or ClusterState.TERMINATED))
.WaitAndRetryForeverAsync(
_ => TimeSpan.FromSeconds(pollIntervalSeconds),
(delegateResult, _) =>
{
if (delegateResult.Exception != null)
{
Console.WriteLine($"[{DateTime.UtcNow:s}] Failed to query cluster info - {delegateResult.Exception}");
}
});
await retryPolicy.ExecuteAsync(async () =>
{
var info = await clusterClient.Get(clusterId);
Console.WriteLine($"[{DateTime.UtcNow:s}]Cluster:{clusterId}\tState:{info.State}\tMessage:{info.StateMessage}");
return info;
});
}
private static async Task TestClustersApi(DatabricksClient client)
{
Console.WriteLine("Listing node types (take 10)");
var nodeTypes = await client.Clusters.ListNodeTypes();
foreach (var nodeType in nodeTypes.Take(10))
{
Console.WriteLine($"\t{nodeType.NodeTypeId}\tMemory: {nodeType.MemoryMb} MB\tCores: {nodeType.NumCores}\tAvailable Quota: {nodeType.ClusterCloudProviderNodeInfo.AvailableCoreQuota}");
}
Console.WriteLine("Listing Databricks runtime versions");
var sparkVersions = await client.Clusters.ListSparkVersions();
foreach (var (key, name) in sparkVersions)
{
Console.WriteLine($"\t{key}\t\t{name}");
}
Console.WriteLine("Listing existing clusters");
var clusters = await client.Clusters.List();
foreach (var cluster in clusters)
{
Console.WriteLine($"\t{cluster.ClusterId}\t\t{cluster.ClusterName}");
}
Console.WriteLine("Creating standard cluster");
var clusterConfig = ClusterAttributes
.GetNewClusterConfiguration("SampleProgram cluster")
.WithRuntimeVersion(RuntimeVersions.Runtime_10_4)
.WithAutoTermination(30)
.WithClusterLogConf("dbfs:/logs/")
.WithNodeType(NodeTypes.Standard_D3_v2)
.WithClusterMode(ClusterMode.SingleNode)
.WithDockerImage("databricksruntime/standard:latest")
.WithCredentialPassThrough(true, DatabricksUserName);
var clusterId = await client.Clusters.Create(clusterConfig);
var createdCluster = await client.Clusters.Get(clusterId);
Console.WriteLine(JsonSerializer.Serialize(createdCluster, Options));
await WaitForCluster(client.Clusters, clusterId);
Console.WriteLine($"Editing cluster {clusterId}");
createdCluster.CustomTags = new Dictionary<string, string> { { "TestingTagKey", "TestingTagValue" } };
createdCluster.WithCredentialPassThrough(false);
await client.Clusters.Edit(clusterId, createdCluster);
await WaitForCluster(client.Clusters, clusterId);
Console.WriteLine("Deleting cluster {0}", clusterId);
await client.Clusters.Delete(clusterId);
Console.WriteLine("Creating Photon cluster");
clusterConfig = ClusterAttributes
.GetNewClusterConfiguration("SampleProgram cluster")
.WithRuntimeVersion(RuntimeVersions.Runtime_10_4_PHOTON)
.WithClusterMode(ClusterMode.Standard)
.WithNumberOfWorkers(1)
.WithNodeType(NodeTypes.Standard_E8s_v3);
clusterId = await client.Clusters.Create(clusterConfig);
createdCluster = await client.Clusters.Get(clusterId);
Console.WriteLine(JsonSerializer.Serialize(createdCluster, Options));
await WaitForCluster(client.Clusters, clusterId);
Console.WriteLine("Deleting cluster {0}", clusterId);
await client.Clusters.Delete(clusterId);
Console.WriteLine("Creating HighConcurrency cluster");
clusterConfig = ClusterAttributes
.GetNewClusterConfiguration("SampleProgram cluster")
.WithRuntimeVersion(RuntimeVersions.Runtime_10_4)
.WithAutoScale(1, 3)
.WithAutoTermination(30)
.WithClusterLogConf("dbfs:/logs/")
.WithNodeType(NodeTypes.Standard_D3_v2)
.WithClusterMode(ClusterMode.HighConcurrency)
.WithTableAccessControl(true);
clusterId = await client.Clusters.Create(clusterConfig);
createdCluster = await client.Clusters.Get(clusterId);
Console.WriteLine(JsonSerializer.Serialize(createdCluster, Options));
await WaitForCluster(client.Clusters, clusterId);
Console.WriteLine($"Terminating cluster {clusterId}");
await client.Clusters.Terminate(clusterId);
await WaitForCluster(client.Clusters, clusterId, 5);
Console.WriteLine($"Getting all events from cluster {clusterId}");
EventsResponse eventsResponse = null;
var events = new List<ClusterEvent>();
do
{
var nextPage = eventsResponse?.NextPage;
eventsResponse = await client.Clusters.Events(
clusterId,
nextPage?.StartTime,
nextPage?.EndTime,
nextPage?.Order,
nextPage?.EventTypes,
nextPage?.Offset,
nextPage?.Limit
);
events.AddRange(eventsResponse.Events);
} while (eventsResponse.HasNextPage);
Console.WriteLine("{0} events retrieved from cluster {1}.", events.Count, clusterId);
Console.WriteLine("Top 10 events: ");
foreach (var e in events.Take(10))
{
Console.WriteLine("\t[{0:s}] {1}\t{2}", e.Timestamp, e.Type, e.Details.User);
}
Console.WriteLine("Deleting cluster {0}", clusterId);
await client.Clusters.Delete(clusterId);
}
}