src/Microsoft.Azure.WebJobs.Extensions.Dapr/Bindings/Converters/DaprStateConverter.cs (134 lines of code) (raw):

// ------------------------------------------------------------ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. // ------------------------------------------------------------ namespace Microsoft.Azure.WebJobs.Extensions.Dapr.Bindings.Converters { using System; using System.IO; using System.Net; using System.Text; using System.Text.Json; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Functions.Extensions.Dapr.Core.Utils; using Microsoft.Azure.WebJobs; using Microsoft.Azure.WebJobs.Extensions.Dapr.Exceptions; using Microsoft.Azure.WebJobs.Extensions.Dapr.Services; using Microsoft.Azure.WebJobs.Extensions.Dapr.Utils; using Newtonsoft.Json.Linq; class DaprStateConverter : IAsyncConverter<DaprStateAttribute, DaprStateRecord>, IAsyncConverter<DaprStateAttribute, byte[]>, IAsyncConverter<DaprStateAttribute, string>, IAsyncConverter<DaprStateAttribute, Stream>, IAsyncConverter<DaprStateAttribute, JsonElement>, IAsyncConverter<DaprStateAttribute, JObject>, IAsyncConverter<DaprStateAttribute, JToken> { readonly IDaprServiceClient daprClient; public DaprStateConverter(IDaprServiceClient daprClient) { this.daprClient = daprClient; } async Task<DaprStateRecord> IAsyncConverter<DaprStateAttribute, DaprStateRecord>.ConvertAsync( DaprStateAttribute input, CancellationToken cancellationToken) { DaprStateRecord record = await this.GetStateRecordAsync(input, cancellationToken); using StreamReader reader = new StreamReader(record.ContentStream); string content = await reader.ReadToEndAsync(); if (!string.IsNullOrEmpty(content)) { record.Value = JsonDocument.Parse(content).RootElement; } return record; } public async Task<byte[]> ConvertAsync( DaprStateAttribute input, CancellationToken cancellationToken) { string content = await this.GetStringContentAsync(input, cancellationToken); if (string.IsNullOrEmpty(content)) { return Array.Empty<byte>(); } // Per Yaron, Dapr only supports JSON payloads over HTTP. // By default we assume that the payload is a JSON-serialized base64 string of bytes JsonElement json = JsonDocument.Parse(content).RootElement; byte[]? bytes; try { bytes = JsonSerializer.Deserialize<byte[]>(json); } catch (JsonException) { // Looks like it's not actually JSON - just return the raw bytes bytes = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(json, JsonUtils.DefaultSerializerOptions)); } return bytes ?? Array.Empty<byte>(); } Task<string> IAsyncConverter<DaprStateAttribute, string>.ConvertAsync( DaprStateAttribute input, CancellationToken cancellationToken) { return this.GetStringContentAsync(input, cancellationToken); } async Task<Stream> IAsyncConverter<DaprStateAttribute, Stream>.ConvertAsync( DaprStateAttribute input, CancellationToken cancellationToken) { DaprStateRecord record = await this.GetStateRecordAsync(input, cancellationToken); return record.ContentStream; } async Task<JsonElement> IAsyncConverter<DaprStateAttribute, JsonElement>.ConvertAsync( DaprStateAttribute input, CancellationToken cancellationToken) { string content = await this.GetStringContentAsync(input, cancellationToken); if (string.IsNullOrEmpty(content)) { return default; } return JsonDocument.Parse(content).RootElement; } async Task<JObject> IAsyncConverter<DaprStateAttribute, JObject>.ConvertAsync( DaprStateAttribute input, CancellationToken cancellationToken) { string content = await this.GetStringContentAsync(input, cancellationToken); if (string.IsNullOrEmpty(content)) { return default!; } return JObject.Parse(content); } async Task<JToken> IAsyncConverter<DaprStateAttribute, JToken>.ConvertAsync( DaprStateAttribute input, CancellationToken cancellationToken) { string content = await this.GetStringContentAsync(input, cancellationToken); if (string.IsNullOrEmpty(content)) { return default!; } return JToken.Parse(content); } private async Task<string> GetStringContentAsync(DaprStateAttribute input, CancellationToken cancellationToken) { DaprStateRecord stateRecord = await this.GetStateRecordAsync(input, cancellationToken); if (stateRecord.ContentStream.Length == 0) { throw new DaprException( HttpStatusCode.NotFound, ErrorCodes.ErrNoContent, $"Failed getting state with key {input.Key} from state store {input.StateStore}: state {input.Key} not found."); } var contentJson = await JsonDocument.ParseAsync(stateRecord.ContentStream); return JsonSerializer.Serialize(contentJson, JsonUtils.DefaultSerializerOptions); } private async Task<DaprStateRecord> GetStateRecordAsync(DaprStateAttribute input, CancellationToken cancellationToken) { DaprStateRecord stateRecord = await this.daprClient.GetStateAsync( input.DaprAddress, input.StateStore ?? throw new ArgumentException("No state store name was specified.", nameof(input.StateStore)), input.Key ?? throw new ArgumentException("No state store key was specified.", nameof(input.Key)), cancellationToken); return stateRecord; } } }