csharp/Microsoft.Azure.Databricks.Client/DbfsApiClient.cs (130 lines of code) (raw):
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
using Microsoft.Azure.Databricks.Client.Models;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Text.Json;
using System.Text.Json.Nodes;
using System.Threading;
using System.Threading.Tasks;
using FileInfo = Microsoft.Azure.Databricks.Client.Models.FileInfo;
namespace Microsoft.Azure.Databricks.Client;
public sealed class DbfsApiClient : ApiClient, IDbfsApi
{
/// <summary>
/// Initializes a new instance of the <see cref="DbfsApiClient"/> class.
/// </summary>
/// <param name="httpClient">The HTTP client.</param>
public DbfsApiClient(HttpClient httpClient) : base(httpClient)
{
}
public async Task<long> Create(string path, bool overwrite, CancellationToken cancellationToken = default)
{
var request = new { path, overwrite };
var response = await HttpPost<dynamic, FileHandle>(this.HttpClient, $"{ApiVersion}/dbfs/create", request, cancellationToken).ConfigureAwait(false);
return response.Handle;
}
public async Task AddBlock(long fileHandle, byte[] data, CancellationToken cancellationToken = default)
{
var request = new { handle = fileHandle, data };
await HttpPost(this.HttpClient, $"{ApiVersion}/dbfs/add-block", request, cancellationToken).ConfigureAwait(false);
}
public async Task Close(long fileHandle, CancellationToken cancellationToken = default)
{
var handle = new FileHandle { Handle = fileHandle };
await HttpPost(this.HttpClient, $"{ApiVersion}/dbfs/close", handle, cancellationToken).ConfigureAwait(false);
}
public async Task Upload(string path, bool overwrite, Stream stream, CancellationToken cancellationToken = default)
{
const int mb = 1024 * 1024;
var handle = await this.Create(path, overwrite, cancellationToken).ConfigureAwait(false);
var originalPosition = 0L;
if (stream.CanSeek)
{
originalPosition = stream.Position;
stream.Position = 0;
}
var buffer = new byte[mb];
try
{
int bytesRead;
while ((bytesRead = await stream.ReadAsync(buffer.AsMemory(0, mb), cancellationToken)) > 0)
{
var contents = new byte[bytesRead];
Array.Copy(buffer, contents, bytesRead);
await this.AddBlock(handle, contents, cancellationToken).ConfigureAwait(false);
}
await this.Close(handle, cancellationToken).ConfigureAwait(false);
}
finally
{
if (stream.CanSeek)
{
stream.Position = originalPosition;
}
}
}
public async Task Delete(string path, bool recursive, CancellationToken cancellationToken = default)
{
var request = new { path, recursive };
await HttpPost(this.HttpClient, $"{ApiVersion}/dbfs/delete", request, cancellationToken).ConfigureAwait(false);
}
public async Task<FileInfo> GetStatus(string path, CancellationToken cancellationToken = default)
{
var encodedPath = WebUtility.UrlEncode(path);
var url = $"{ApiVersion}/dbfs/get-status?path={encodedPath}";
var result = await HttpGet<FileInfo>(this.HttpClient, url, cancellationToken).ConfigureAwait(false);
return result;
}
public async Task<IEnumerable<FileInfo>> List(string path, CancellationToken cancellationToken = default)
{
var encodedPath = WebUtility.UrlEncode(path);
var url = $"{ApiVersion}/dbfs/list?path={encodedPath}";
var result = await HttpGet<JsonObject>(this.HttpClient, url, cancellationToken).ConfigureAwait(false);
return result.TryGetPropertyValue("files", out var files)
? files.Deserialize<IEnumerable<FileInfo>>(Options)
: Enumerable.Empty<FileInfo>();
}
public async Task Mkdirs(string path, CancellationToken cancellationToken = default)
{
var request = new { path };
await HttpPost(this.HttpClient, $"{ApiVersion}/dbfs/mkdirs", request, cancellationToken).ConfigureAwait(false);
}
public async Task Move(string sourcePath, string destinationPath, CancellationToken cancellationToken = default)
{
var request = new { source_path = sourcePath, destination_path = destinationPath };
await HttpPost(this.HttpClient, $"{ApiVersion}/dbfs/move", request, cancellationToken).ConfigureAwait(false);
}
public async Task Put(string path, byte[] contents, bool overwrite, CancellationToken cancellationToken = default)
{
var form = new MultipartFormDataContent
{
{new StringContent(path), "path"},
{new StringContent(overwrite.ToString().ToLowerInvariant()), "overwrite"},
{new ByteArrayContent(contents), "contents"}
};
var response = await this.HttpClient.PostAsync($"{ApiVersion}/dbfs/put", form, cancellationToken).ConfigureAwait(false);
if (!response.IsSuccessStatusCode)
{
throw CreateApiException(response);
}
}
public async Task<FileReadBlock> Read(string path, long offset, long length, CancellationToken cancellationToken = default)
{
var encodedPath = WebUtility.UrlEncode(path);
var url = $"{ApiVersion}/dbfs/read?path={encodedPath}&offset={offset}&length={length}";
var result = await HttpGet<FileReadBlock>(this.HttpClient, url, cancellationToken).ConfigureAwait(false);
return result;
}
public async Task Download(string path, Stream stream, CancellationToken cancellationToken = default)
{
const int mb = 1024 * 1024;
var totalBytesRead = 0L;
var block = await Read(path, totalBytesRead, mb, cancellationToken);
while (block.BytesRead > 0)
{
totalBytesRead += block.BytesRead;
await stream.WriteAsync(block.Data.AsMemory(0, block.Data.Length), cancellationToken);
block = await Read(path, totalBytesRead, mb, cancellationToken);
}
}
}