in src/Proton.TestPeer/Matchers/Types/Messaging/CompositingDataSectionMatcher.cs [70:139]
protected override bool MatchesSafely(Stream receivedBinary)
{
if (expectDataSectionPreamble)
{
object descriptor = ReadDescribedTypeEncoding(receivedBinary);
if (!(DescriptorCode.Equals(descriptor) || DescriptorSymbol.Equals(descriptor)))
{
return false;
}
// Should be a Binary AMQP type with a length value and possibly some bytes
EncodingCodes encodingCode = receivedBinary.ReadEncodingCode();
if (encodingCode == EncodingCodes.VBin8)
{
expectedCurrentDataSectionBytes = receivedBinary.ReadUnsignedByte();
}
else if (encodingCode == EncodingCodes.VBin32)
{
expectedCurrentDataSectionBytes = receivedBinary.ReadInt();
}
else
{
decodingErrorDescription = "Expected to read a Binary Type but read encoding code: " + encodingCode;
return false;
}
if (expectedCurrentDataSectionBytes > expectedRemainingBytes)
{
decodingErrorDescription = "Expected encoded Binary to indicate size of: " + expectedRemainingBytes + ", " +
"or less but read an encoded size of: " + expectedCurrentDataSectionBytes;
return false;
}
expectDataSectionPreamble = false; // We got the current preamble
}
if (expectedRemainingBytes != 0)
{
int currentChunkSize = Math.Min(expectedCurrentDataSectionBytes, (int)receivedBinary.ReadableBytes());
byte[] expectedValueChunk = expectedValueStream.ReadBytes(currentChunkSize);
byte[] currentChunk = receivedBinary.ReadBytes(currentChunkSize);
if (!expectedValueChunk.SequenceEqual(currentChunk))
{
return false;
}
expectedRemainingBytes -= currentChunkSize;
expectedCurrentDataSectionBytes -= currentChunkSize;
if (expectedRemainingBytes != 0 && expectedCurrentDataSectionBytes == 0)
{
expectDataSectionPreamble = true;
expectedCurrentDataSectionBytes = -1;
}
}
if (expectedRemainingBytes == 0 && receivedBinary.IsReadable() && !ExpectTrailingBytes)
{
unexpectedTrailingBytes = true;
return false;
}
else
{
return true;
}
}