in shims/amqpnetlite/src/amqp_types_test/Receiver/Receiver.cs [385:433]
public void run()
{
ManualResetEvent receiverAttached = new ManualResetEvent(false);
OnAttached onReceiverAttached = (l, a) => { receiverAttached.Set(); };
Address address = new Address(string.Format("amqp://{0}", brokerUrl));
Connection connection = new Connection(address);
Session session = new Session(connection);
ReceiverLink receiverlink = new ReceiverLink(session,
"Lite-amqp-types-test-receiver",
new Source() { Address = queueName },
onReceiverAttached);
if (receiverAttached.WaitOne(10000))
{
while (nReceived < nExpected)
{
Message message = receiverlink.Receive(System.TimeSpan.FromSeconds(10));
if (message != null)
{
nReceived += 1;
receiverlink.Accept(message);
MessageValue mv = new MessageValue(message.Body);
mv.Decode();
if (mv.QpiditTypeName != amqpType)
{
throw new ApplicationException(string.Format
("Incorrect AMQP type found in message body: expected: {0}; found: {1}",
amqpType, mv.QpiditTypeName));
}
//Console.WriteLine("{0} [{1}]", mv.QpiditTypeName, mv.ToString());
receivedMessageValues.Add(mv);
}
else
{
throw new ApplicationException(string.Format(
"Time out receiving message {0} of {1}", nReceived+1, nExpected));
}
}
}
else
{
throw new ApplicationException(string.Format(
"Time out attaching to test broker {0} queue {1}", brokerUrl, queueName));
}
receiverlink.Close();
session.Close();
connection.Close();
}