netmf/NetMFLite/Receiver.cs (117 lines of code) (raw):
// ------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation
// All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the ""License""); you may not use this
// file except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR
// CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR
// NON-INFRINGEMENT.
//
// See the Apache Version 2.0 License for specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------------------
namespace Amqp
{
using System;
using Amqp.Types;
/// <summary>
/// The delegate to be invoked when a message is received.
/// </summary>
/// <param name="receiver">The receiver.</param>
/// <param name="message">The received message.</param>
public delegate void OnMessage(Receiver receiver, Message message);
/// <summary>
/// A receiver link.
/// </summary>
public class Receiver : Link
{
Client client;
OnMessage onMessage;
uint credit;
byte restored;
byte flowThreshold;
uint deliveryCount;
bool deliveryReceived;
uint lastDeliveryId;
ByteBuffer messageBuffer;
internal Receiver(Client client, string name, string address)
{
this.Role = true;
this.Name = name;
this.client = client;
this.lastDeliveryId = uint.MaxValue;
}
/// <summary>
/// Starts the receiver link.
/// </summary>
/// <param name="credit">The link credit.</param>
/// <param name="onMessage">The message callback.</param>
public void Start(uint credit, OnMessage onMessage)
{
Fx.AssertAndThrow(ErrorCode.ReceiverStartInvalidState, this.State < 0xff);
this.credit = credit;
this.flowThreshold = this.credit > 512u ? byte.MaxValue : (byte)(this.credit / 2);
this.onMessage = onMessage;
this.client.SendFlow(this.Handle, this.deliveryCount, credit);
}
/// <summary>
/// Accepts a received message.
/// </summary>
/// <param name="message">The received message.</param>
public void Accept(Message message)
{
Fx.AssertAndThrow(ErrorCode.ReceiverAcceptInvalidState, this.State < 0xff);
if (!message.settled)
{
this.client.SendDisposition(true, message.deliveryId, true, new DescribedValue(0x24ul, new List()));
}
if (this.credit < uint.MaxValue)
{
lock (this)
{
this.credit++;
if (++this.restored >= this.flowThreshold)
{
this.restored = 0;
this.client.SendFlow(this.Handle, this.deliveryCount, this.credit);
}
}
}
}
/// <summary>
/// Closes the receiver.
/// </summary>
public void Close()
{
if (this.State < 0xff)
{
this.client.CloseLink(this);
}
}
internal override void OnAttach(List attach)
{
this.deliveryCount = (uint)attach[9];
}
internal override void OnFlow(List fields)
{
}
internal override void OnDisposition(uint first, uint last, DescribedValue state)
{
}
internal void OnTransfer(List transfer, ByteBuffer payload)
{
for (int i = transfer.Count; i < 11; i++)
{
transfer.Add(null);
}
bool more = transfer[5] != null && true.Equals(transfer[5]);
if (transfer[1] == null || (this.deliveryReceived && this.lastDeliveryId.Equals(transfer[1])))
{
AmqpBitConverter.WriteBytes(this.messageBuffer, payload.Buffer, payload.Offset, payload.Length);
}
else
{
lock (this)
{
Fx.AssertAndThrow(ErrorCode.InvalidCreditOnTransfer, this.credit > 0);
this.deliveryCount++;
if (this.credit < uint.MaxValue)
{
this.credit--;
}
}
this.lastDeliveryId = (uint)transfer[1];
this.deliveryReceived = true;
if (this.messageBuffer == null)
{
if (more)
{
this.messageBuffer = new ByteBuffer(payload.Length * 2, true);
AmqpBitConverter.WriteBytes(this.messageBuffer, payload.Buffer, payload.Offset, payload.Length);
}
else
{
this.messageBuffer = payload;
}
}
}
if (!more) // more
{
Message message = Message.Decode(this.messageBuffer);
this.messageBuffer = null;
message.deliveryId = this.lastDeliveryId;
message.settled = transfer[4] != null && true.Equals(transfer[4]);
this.onMessage(this, message);
}
}
}
}