in src/AmqpMessageReader.cs [119:216]
public static bool TryRead<T1, T2>(T1 t1, T2 t2, ByteBuffer buffer, SectionFlag sections,
Func<T1, T2, Section, bool> sectionHandler, string context, out Error error)
{
error = null;
if (buffer.TryAddReference())
{
int pos = buffer.Offset;
try
{
while (buffer.Length > 0)
{
int offset = buffer.Offset;
SectionInfo info;
if (!TryReadSectionInfo(buffer, out info, out error))
{
return false;
}
AmqpDescribed section;
int length;
if ((info.Flag & sections) > 0)
{
section = info.Ctor();
if (section.DescriptorCode == Data.Code)
{
section.Value = AmqpCodec.DecodeBinary(buffer, false);
}
else
{
section.DecodeValue(buffer);
}
section.Offset = offset;
length = buffer.Offset - offset;
section.Length = length;
}
else
{
// fast forward to next section
FormatCode formatCode = AmqpEncoding.ReadFormatCode(buffer);
if (formatCode != FormatCode.Null)
{
error = info.Scanner(formatCode, buffer);
if (error != null)
{
return false;
}
}
section = null;
length = buffer.Offset - offset;
}
Section s = new Section(info.Flag, offset, length, section);
bool shouldContinue = sectionHandler(t1, t2, s);
if (!shouldContinue)
{
break;
}
}
return true;
}
catch (SerializationException se)
{
error = GetDecodeError(se.Message);
}
catch (AmqpException ae)
{
error = ae.Error;
}
catch (Exception exception)
{
if (Fx.IsFatal(exception))
{
throw;
}
error = GetDecodeError(exception.Message);
}
finally
{
buffer.Seek(pos);
buffer.RemoveReference();
}
}
else
{
// The delivery is already disposed. Treat it as decode error.
error = GetDecodeError(Resources.AmqpBufferAlreadyReclaimed);
}
return false;
}