in dotnet/src/Azure.Iot.Operations.Protocol/RPC/CommandInvoker.cs [713:771]
protected virtual async ValueTask DisposeAsyncCore(bool disposing)
{
if (_isDisposed)
{
return;
}
_mqttClient.ApplicationMessageReceivedAsync -= MessageReceivedCallbackAsync;
lock (_requestIdMapLock)
{
foreach (KeyValuePair<string, ResponsePromise> responsePromise in _requestIdMap)
{
if (responsePromise.Value != null && responsePromise.Value.CompletionSource != null)
{
SetCanceledSafe(responsePromise.Value.CompletionSource);
}
}
_requestIdMap.Clear();
}
try
{
if (_subscribedTopics.Count > 0)
{
MqttClientUnsubscribeOptions unsubscribeOptions = new();
lock (_subscribedTopicsSetLock)
{
foreach (string subscribedTopic in _subscribedTopics)
{
unsubscribeOptions.TopicFilters.Add(subscribedTopic);
}
}
MqttClientUnsubscribeResult unsubAck = await _mqttClient.UnsubscribeAsync(unsubscribeOptions, CancellationToken.None).ConfigureAwait(false);
if (!unsubAck.IsUnsubAckSuccessful())
{
Trace.TraceError($"Failed to unsubscribe from the topic(s) for the command invoker of '{_commandName}'.");
}
}
}
catch (Exception e)
{
Trace.TraceWarning("Encountered an error while unsubscribing during disposal {0}", e);
}
lock (_subscribedTopicsSetLock)
{
_subscribedTopics.Clear();
}
if (disposing)
{
// This will disconnect and dispose the client if necessary
await _mqttClient.DisposeAsync();
}
_isDisposed = true;
}