in iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs [1120:1188]
private async Task<Message> SendTwinRequestAsync(Message request, string rid, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
using var responseReceived = new SemaphoreSlim(0);
Message response = null; ;
ExceptionDispatchInfo responseException = null;
void OnTwinResponse(Message possibleResponse)
{
try
{
if (ParseResponseTopic(possibleResponse.MqttTopicName, out string receivedRid, out int status))
{
if (rid == receivedRid)
{
if (status >= 300)
{
// The Hub team is refactoring the retriable status codes without breaking changes to the existing ones.
// It can be expected that we may bring more retriable codes here in the future.
// Retry for Http status code 429 (too many requests)
if (status == 429)
{
throw new IotHubThrottledException($"Request {rid} was throttled by the server");
}
else
{
throw new IotHubException($"Request {rid} returned status {status}", isTransient: false);
}
}
else
{
response = possibleResponse;
responseReceived.Release();
}
}
}
}
catch (Exception e)
{
responseException = ExceptionDispatchInfo.Capture(e);
responseReceived.Release();
}
}
try
{
_twinResponseEvent += OnTwinResponse;
await SendEventAsync(request, cancellationToken).ConfigureAwait(false);
await responseReceived.WaitAsync(TwinTimeout, cancellationToken).ConfigureAwait(false);
if (responseException != null)
{
responseException.Throw();
}
else if (response == null)
{
throw new TimeoutException($"Response for message {rid} not received");
}
return response;
}
finally
{
_twinResponseEvent -= OnTwinResponse;
}
}