in src/Proton.TestPeer/Driver/AMQPTestDriver.cs [465:528]
public void HandlePerformative(uint frameSize, PerformativeDescribedType amqp, ushort channel, byte[] payload)
{
mutex.WaitOne();
switch (amqp.Type)
{
case PerformativeType.Heartbeat:
break;
case PerformativeType.Open:
remoteOpen = (Open)amqp;
performativeCount++;
break;
default:
performativeCount++;
break;
}
try
{
if (!script.TryDequeue(out IScriptedElement scriptEntry))
{
SignalFailure(new AssertionError(
"Received AMQP performative when not expecting any input: " + amqp?.GetType().Name));
}
else if (scriptEntry is ScriptedExpectation expectation)
{
try
{
amqp.Invoke(expectation, frameSize, payload, channel, this);
}
catch (UnexpectedPerformativeError unexpected)
{
if (expectation.IsOptional)
{
HandlePerformative(frameSize, amqp, channel, payload);
}
else
{
logger.LogWarning(unexpected.Message);
SignalFailure(unexpected);
throw;
}
}
catch (Exception assertion)
{
logger.LogWarning(assertion.Message);
SignalFailure(assertion);
throw;
}
}
else
{
SignalFailure(new AssertionError(
"Received AMQP performative when not expecting to perform some action or other script item: " +
amqp?.GetType().Name));
}
ProcessScript(scriptEntry);
}
finally
{
mutex.ReleaseMutex();
}
}