in edge-hub/core/src/Microsoft.Azure.Devices.Routing.Core/endpoints/statemachine/EndpointExecutorFsm.cs [283:353]
static async Task EnterSendingAsync(EndpointExecutorFsm thisPtr)
{
ICommand next;
TimeSpan retryAfter;
ICollection<IMessage> messages = EmptyMessages;
Stopwatch stopwatch = Stopwatch.StartNew();
TimeSpan endpointTimeout = TimeSpan.FromMilliseconds(thisPtr.config.Timeout.TotalMilliseconds * thisPtr.Endpoint.FanOutFactor);
try
{
Preconditions.CheckNotNull(thisPtr.currentSendCommand);
messages = thisPtr.currentSendCommand.Messages.Where(thisPtr.Checkpointer.Admit).ToArray();
if (messages.Count > 0)
{
ISinkResult<IMessage> result;
Events.Send(thisPtr, thisPtr.currentSendCommand.Messages, messages);
using (var cts = new CancellationTokenSource(endpointTimeout))
{
result = await thisPtr.processor.ProcessAsync(messages, cts.Token);
}
if (result.IsSuccessful)
{
if (thisPtr.lastFailedRevivalTime.HasValue)
{
Events.Revived(thisPtr);
}
// reset lastFailedRevivalTime and unhealthy since
thisPtr.lastFailedRevivalTime = Option.None<DateTime>();
thisPtr.unhealthySince = Option.None<DateTime>();
thisPtr.retryAttempts = 0;
Events.SendSuccess(thisPtr, messages, result, stopwatch);
}
else
{
thisPtr.unhealthySince = !thisPtr.unhealthySince.HasValue
? Option.Some(thisPtr.systemTime.UtcNow)
: thisPtr.unhealthySince;
Events.SendFailure(thisPtr, result, stopwatch);
}
next = Commands.Checkpoint(result);
}
else
{
Events.SendNone(thisPtr);
next = Commands.Checkpoint(SinkResult<IMessage>.Empty);
}
}
catch (Exception ex) when (thisPtr.ShouldRetry(ex, out retryAfter))
{
Events.SendFailureUnhandledException(thisPtr, messages, stopwatch, ex);
thisPtr.unhealthySince = !thisPtr.unhealthySince.HasValue
? Option.Some(thisPtr.systemTime.UtcNow)
: thisPtr.unhealthySince;
next = Commands.Fail(retryAfter);
}
catch (Exception ex)
{
Events.SendFailureUnhandledException(thisPtr, messages, stopwatch, ex);
thisPtr.unhealthySince = !thisPtr.unhealthySince.HasValue
? Option.Some(thisPtr.systemTime.UtcNow)
: thisPtr.unhealthySince;
next = thisPtr.config.ThrowOnDead ? (ICommand)Commands.Throw(ex) : Commands.Die;
}
await RunInternalAsync(thisPtr, next);
}