public static Task AckCompletedAsync()

in src/Libs/Common/Extensions/MessageClientExtensions.cs [129:197]


        public static Task AckCompletedAsync(this IMessageClient client, CommandMessage command) =>
            client.AckAsync(command, AckStatus.Completed);

        public static Task AckCanceledAsync(this IMessageClient client, CommandMessage command) =>
            client.AckAsync(command, AckStatus.Canceled);

        public static Task AckFaultedAsync(this IMessageClient client, CommandMessage command, string error) =>
            client.AckAsync(command, AckStatus.Faulted, error);

        public static Task AckProgressAsync(this IMessageClient client, CommandMessage command, double progress) =>
            client.AckAsync(command, AckStatus.Running, progress: progress);

        public static async Task<Task> GetWhenAllAckAsync(
            this MessageClient messageClient,
            IReadOnlyCollection<string> senders,
            string command,
            CancellationToken cancellationToken)
        {
            var acks = new Dictionary<string, TaskCompletionSource<bool>>();

            foreach (var sender in senders)
            {
                acks[sender] = new TaskCompletionSource<bool>();
            }

            await messageClient.WithHandlers(
                MessageHandler.CreateAckHandler(
                    command,
                    ack =>
                    {
                        if (acks.TryGetValue(ack.Sender, out var tcs))
                        {
                            switch (ack.Status)
                            {
                                case AckStatus.Canceled:
                                    tcs.TrySetCanceled();
                                    break;
                                case AckStatus.Faulted:
                                    tcs.TrySetException(new RpcFaultedException(ack.Error ?? string.Empty));
                                    break;
                                case AckStatus.Completed:
                                    tcs.TrySetResult(true);
                                    break;
                                case AckStatus.Running:
                                default:
                                    break;
                            }
                        }
                        return Task.CompletedTask;
                    }));

            _ = CancelAfter(TimeSpan.FromMinutes(60));
            return Task.WhenAll(acks.Select(x => x.Value.Task));

            async Task CancelAfter(TimeSpan span)
            {
                try
                {
                    await Task.Delay(span, cancellationToken);
                }
                finally
                {
                    foreach (var pair in acks)
                    {
                        pair.Value.TrySetCanceled();
                    }
                }
            }
        }