src/Microsoft.Azure.WebJobs.Extensions.Dapr/Bindings/DaprInvokeMethodAsyncCollector.cs (51 lines of code) (raw):
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
namespace Microsoft.Azure.WebJobs.Extensions.Dapr
{
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Functions.Extensions.Dapr.Core;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Dapr.Services;
class DaprInvokeMethodAsyncCollector : IAsyncCollector<InvokeMethodParameters>
{
readonly ConcurrentQueue<InvokeMethodParameters> requests = new ConcurrentQueue<InvokeMethodParameters>();
readonly DaprInvokeAttribute attr;
readonly IDaprServiceClient daprService;
public DaprInvokeMethodAsyncCollector(DaprInvokeAttribute attr, IDaprServiceClient daprService)
{
this.attr = attr;
this.daprService = daprService;
}
public Task AddAsync(InvokeMethodParameters item, CancellationToken cancellationToken = default)
{
if (item.AppId == null)
{
item.AppId = this.attr.AppId ?? throw new ArgumentException("A non-null app ID must be specified.");
}
if (item.MethodName == null)
{
item.MethodName = this.attr.MethodName ?? throw new ArgumentException("A non-null method name must be specified.");
}
if (item.HttpVerb == null)
{
item.HttpVerb = this.attr.HttpVerb ?? throw new ArgumentException("A non-null method verb must be specified.");
}
this.requests.Enqueue(item);
return Task.CompletedTask;
}
public async Task FlushAsync(CancellationToken cancellationToken = default)
{
while (this.requests.TryDequeue(out InvokeMethodParameters item))
{
await this.daprService.InvokeMethodAsync(
this.attr.DaprAddress,
item.AppId!,
item.MethodName!,
item.HttpVerb!,
item.Body,
cancellationToken);
}
}
}
}