in src/Libs/Common/Extensions/MessageClientExtensions.cs [129:197]
public static Task AckCompletedAsync(this IMessageClient client, CommandMessage command) =>
client.AckAsync(command, AckStatus.Completed);
public static Task AckCanceledAsync(this IMessageClient client, CommandMessage command) =>
client.AckAsync(command, AckStatus.Canceled);
public static Task AckFaultedAsync(this IMessageClient client, CommandMessage command, string error) =>
client.AckAsync(command, AckStatus.Faulted, error);
public static Task AckProgressAsync(this IMessageClient client, CommandMessage command, double progress) =>
client.AckAsync(command, AckStatus.Running, progress: progress);
public static async Task<Task> GetWhenAllAckAsync(
this MessageClient messageClient,
IReadOnlyCollection<string> senders,
string command,
CancellationToken cancellationToken)
{
var acks = new Dictionary<string, TaskCompletionSource<bool>>();
foreach (var sender in senders)
{
acks[sender] = new TaskCompletionSource<bool>();
}
await messageClient.WithHandlers(
MessageHandler.CreateAckHandler(
command,
ack =>
{
if (acks.TryGetValue(ack.Sender, out var tcs))
{
switch (ack.Status)
{
case AckStatus.Canceled:
tcs.TrySetCanceled();
break;
case AckStatus.Faulted:
tcs.TrySetException(new RpcFaultedException(ack.Error ?? string.Empty));
break;
case AckStatus.Completed:
tcs.TrySetResult(true);
break;
case AckStatus.Running:
default:
break;
}
}
return Task.CompletedTask;
}));
_ = CancelAfter(TimeSpan.FromMinutes(60));
return Task.WhenAll(acks.Select(x => x.Value.Task));
async Task CancelAfter(TimeSpan span)
{
try
{
await Task.Delay(span, cancellationToken);
}
finally
{
foreach (var pair in acks)
{
pair.Value.TrySetCanceled();
}
}
}
}