src/FrameDecoder.cs (64 lines of code) (raw):

// Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. namespace Microsoft.Azure.Amqp { using System; using System.Diagnostics.CodeAnalysis; using Microsoft.Azure.Amqp.Encoding; using Microsoft.Azure.Amqp.Framing; sealed class FrameDecoder { int maxFrameSize; ByteBuffer currentFrameBuffer; public FrameDecoder(int maxFrameSize) { this.maxFrameSize = maxFrameSize; } public ProtocolHeader ExtractProtocolHeader(ByteBuffer buffer) { if (buffer.Length < AmqpConstants.ProtocolHeaderSize) { return null; } ProtocolHeader header = new ProtocolHeader(); header.Decode(buffer); return header; } public void ExtractFrameBuffers(ByteBuffer buffer, SerializedWorker<ByteBuffer> bufferHandler) { if (this.currentFrameBuffer != null) { int sizeToWrite = Math.Min(this.currentFrameBuffer.Size, buffer.Length); AmqpBitConverter.WriteBytes(this.currentFrameBuffer, buffer.Buffer, buffer.Offset, sizeToWrite); buffer.Complete(sizeToWrite); if (this.currentFrameBuffer.Size == 0) { ByteBuffer frameBuffer = this.currentFrameBuffer; this.currentFrameBuffer = null; bufferHandler.DoWork(frameBuffer); } } while (buffer.Length >= Frame.HeaderSize) { int frameSize = AmqpCodec.GetFrameSize(buffer); if (frameSize < Frame.HeaderSize || frameSize > this.maxFrameSize) { throw new AmqpException(AmqpErrorCode.FramingError, CommonResources.GetString(CommonResources.InvalidFrameSize, frameSize, this.maxFrameSize)); } int sizeToWrite = Math.Min(frameSize, buffer.Length); this.currentFrameBuffer = new ByteBuffer(frameSize, false); AmqpBitConverter.WriteBytes(this.currentFrameBuffer, buffer.Buffer, buffer.Offset, sizeToWrite); buffer.Complete(sizeToWrite); if (frameSize == sizeToWrite) { ByteBuffer frameBuffer = this.currentFrameBuffer; this.currentFrameBuffer = null; bufferHandler.DoWork(frameBuffer); } else { break; } } } } }