netmf/NetMFLite/Sender.cs (68 lines of code) (raw):
// ------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation
// All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the ""License""); you may not use this
// file except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR
// CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR
// NON-INFRINGEMENT.
//
// See the Apache Version 2.0 License for specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------------------
namespace Amqp
{
using System;
using Amqp.Types;
/// <summary>
/// A sender link.
/// </summary>
public class Sender : Link
{
const int defaultTimeout = 60000;
Client client;
uint credit;
uint deliveryCount;
uint deliveryId;
DescribedValue deliveryState;
internal Sender(Client client, string name, string address)
{
this.Role = false;
this.Name = name;
this.client = client;
}
/// <summary>
/// Sends a message.
/// </summary>
/// <param name="message">The message.</param>
public void Send(Message message)
{
this.Send(message, defaultTimeout);
}
/// <summary>
/// Sends a message.
/// </summary>
/// <param name="message">The message.</param>
/// <param name="timeout">The timeout in seconds.</param>
public void Send(Message message, int timeout)
{
Fx.AssertAndThrow(ErrorCode.SenderSendInvalidState, this.State < 0xff);
this.client.Wait(o => ((Sender)o).credit == 0, this, 60000);
lock (this)
{
if (this.credit < uint.MaxValue)
{
this.credit--;
}
}
this.deliveryState = null;
this.deliveryId = this.client.Send(this, message, timeout == 0);
this.deliveryCount++;
this.client.Wait(o => ((Sender)o).deliveryState == null, this, timeout);
if (!object.Equals(this.deliveryState.Descriptor, 0x24ul))
{
throw new Exception(this.deliveryState.Value.ToString());
}
}
/// <summary>
/// Closes the sender.
/// </summary>
public void Close()
{
this.client.CloseLink(this);
}
internal override void OnAttach(List attach)
{
}
internal override void OnFlow(List flow)
{
lock (this)
{
uint dc = flow[5] == null ? uint.MaxValue : (uint)flow[5];
uint lc = flow[6] == null ? uint.MaxValue : (uint)flow[6];
this.credit = lc < uint.MaxValue ? dc + lc - this.credit : lc;
}
}
internal override void OnDisposition(uint first, uint last, DescribedValue deliveryState)
{
if (this.deliveryId >= first && this.deliveryId <= last)
{
this.deliveryState = deliveryState;
}
}
}
}