netmf/NetMFLite/Client.cs (696 lines of code) (raw):
// ------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation
// All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the ""License""); you may not use this
// file except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR
// CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR
// NON-INFRINGEMENT.
//
// See the Apache Version 2.0 License for specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------------------
namespace Amqp
{
using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using Amqp.Types;
#if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || MF_FRAMEWORK_VERSION_V4_4)
using Microsoft.SPOT.Net.Security;
#elif (NANOFRAMEWORK_1_0)
using System.Net.Security;
#endif
/// <summary>
/// The event handler that is invoked when an AMQP object is closed unexpectedly.
/// </summary>
/// <param name="client">The client.</param>
/// <param name="link">The link. If null, the error applies to the client object.</param>
/// <param name="error">The error condition due to which the object was closed.</param>
public delegate void ErrorEventHandler(Client client, Link link, Symbol error);
delegate bool Condition(object state);
/// <summary>
/// A Client is a channel to communicate with an AMQP peer. It manages an AMQP
/// connection and a session within it.
/// </summary>
public class Client
{
const byte OpenSent = 1;
const byte BeginSent = 1 << 1;
const byte EndSent = 1 << 2;
const byte CloseSent = 1 << 3;
const byte OpenReceived = 1 << 4;
const byte BeginReceived = 1 << 5;
const byte EndReceived = 1 << 6;
const byte CloseReceived = 1 << 7;
const byte AttachSent = 1;
const byte AttachReceived = 2;
const byte DetachSent = 4;
const byte DetachReceived = 8;
const string Name = "netmf-lite";
const uint MaxFrameSize = 1024;
const uint DefaultWindowSize = 100u;
const uint MaxIdleTimeout = 120000;
const int MaxLinks = 8;
const int TransferFramePrefixSize = 30;
static readonly TimerCallback onHeartBeatTimer = OnHeartBeatTimer;
static int linkId;
byte state;
// connection state
AutoResetEvent signal;
NetworkStream transport;
bool sendActive;
int maxFrameSize;
uint idleTimeout;
string hostName;
Timer heartBeatTimer;
// session state
uint outWindow;
uint nextOutgoingId;
uint inWindow;
uint nextIncomingId;
uint deliveryId;
Link[] links;
/// <summary>
/// The event that is raised when any AMQP object is closed unexpectedly.
/// </summary>
public event ErrorEventHandler OnError;
/// <summary>
/// Current idle timeout for a connection (in milliseconds).
/// </summary>
/// <remarks>After opening a connection the <see cref="Client"/> will keep the connection
/// active with an heart beat timer that sends an empty packet before the value is reached.
/// This value must be set before the <see cref="Client"/> is connected.
/// </remarks>
public uint IdleTimeout
{
get { return this.idleTimeout; }
set
{
Fx.AssertAndThrow(ErrorCode.ClientNotAllowedAfterConnect, this.state == 0);
this.idleTimeout = value;
}
}
/// <summary>
/// Gets or sets the host-name to be used to open the connection. It should be set if the
/// virtual host is different from the network host in Connect method.
/// </summary>
public string HostName
{
get { return this.hostName; }
set
{
Fx.AssertAndThrow(ErrorCode.ClientNotAllowedAfterConnect, this.state == 0);
this.hostName = value;
}
}
/// <summary>
/// Creates a new Client object with default settings.
/// </summary>
public Client()
{
this.maxFrameSize = (int)MaxFrameSize;
this.idleTimeout = uint.MaxValue; // no idle timeout
this.inWindow = DefaultWindowSize;
this.outWindow = DefaultWindowSize;
this.links = new Link[MaxLinks];
this.signal = new AutoResetEvent(false);
}
/// <summary>
/// Establish a connection and a session for the client.
/// </summary>
/// <param name="host">The network host name or IP address to connect.</param>
/// <param name="port">The port to connect.</param>
/// <param name="useSsl">If true, use secure socket.</param>
/// <param name="userName">If set, the user name for authentication.</param>
/// <param name="password">If set, the password for authentication.</param>
public void Connect(string host, int port, bool useSsl, string userName, string password)
{
this.transport = Connect(host, port, useSsl);
byte[] header = new byte[8] { (byte)'A', (byte)'M', (byte)'Q', (byte)'P', 0, 1, 0, 0 };
byte[] retHeader;
if (userName != null)
{
header[4] = 3;
this.transport.Write(header, 0, 8);
Fx.DebugPrint(true, 0, "AMQP", new List { string.Concat(header[5], header[6], header[7]) }, header[4]);
byte[] b1 = Encoding.UTF8.GetBytes(userName);
byte[] b2 = Encoding.UTF8.GetBytes(password ?? string.Empty);
byte[] b = new byte[1 + b1.Length + 1 + b2.Length];
Array.Copy(b1, 0, b, 1, b1.Length);
Array.Copy(b2, 0, b, b1.Length + 2, b2.Length);
List saslInit = new List() { new Symbol("PLAIN"), b };
this.WriteFrame(1, 0, 0x41, saslInit);
Fx.DebugPrint(true, 0, "sasl-init", saslInit, "mechanism");
this.transport.Flush();
retHeader = this.ReadFixedSizeBuffer(8);
Fx.DebugPrint(false, 0, "AMQP", new List { string.Concat(retHeader[5], retHeader[6], retHeader[7]) }, retHeader[4]);
Fx.AssertAndThrow(ErrorCode.ClientInitializeHeaderCheckFailed, AreHeaderEqual(header, retHeader));
List body = this.ReadFrameBody(1, 0, 0x40);
Fx.DebugPrint(false, 0, "sasl-mechanisms", body, "server-mechanisms");
Fx.AssertAndThrow(ErrorCode.ClientInitializeWrongBodyCount, body.Count > 0);
Symbol[] mechanisms = GetSymbolMultiple(body[0]);
Fx.AssertAndThrow(ErrorCode.ClientInitializeWrongSymbol, Array.IndexOf(mechanisms, new Symbol("PLAIN")) >= 0);
body = this.ReadFrameBody(1, 0, 0x44);
Fx.AssertAndThrow(ErrorCode.ClientInitializeWrongBodyCount, body.Count > 0);
Fx.DebugPrint(false, 0, "sasl-outcome", body, "code");
Fx.AssertAndThrow(ErrorCode.ClientInitializeSaslFailed, body[0].Equals((byte)0)); // sasl-outcome.code = OK
header[4] = 0;
}
this.transport.Write(header, 0, 8);
Fx.DebugPrint(true, 0, "AMQP", new List { string.Concat(header[5], header[6], header[7]) }, header[4]);
// perform open
this.state |= OpenSent;
var open = new List() { Guid.NewGuid().ToString(), this.hostName ?? host, MaxFrameSize, (ushort)0 };
this.WriteFrame(0, 0, 0x10, open);
Fx.DebugPrint(true, 0, "open", open, "container-id", "host-name", "max-frame-size", "channel-max", "idle-time-out");
// perform begin
this.state |= BeginSent;
var begin = new List() { null, this.nextOutgoingId, this.inWindow, this.outWindow, (uint)(this.links.Length - 1) };
this.WriteFrame(0, 0, 0x11, begin);
Fx.DebugPrint(true, 0, "begin", begin, "remote-channel", "next-outgoing-id", "incoming-window", "outgoing-window", "handle-max");
retHeader = this.ReadFixedSizeBuffer(8);
Fx.DebugPrint(false, 0, "AMQP", new List { string.Concat(retHeader[5], retHeader[6], retHeader[7]) }, retHeader[4]);
Fx.AssertAndThrow(ErrorCode.ClientInitializeHeaderCheckFailed, AreHeaderEqual(header, retHeader));
new Thread(this.PumpThread).Start();
}
/// <summary>
/// Creates a Sender from the client.
/// </summary>
/// <param name="address">The address of the node where messages are sent.</param>
/// <returns>A Sender object.</returns>
public Sender CreateSender(string address)
{
Fx.AssertAndThrow(ErrorCode.ClientNotConnected, this.state > 0 && this.state < 0xff);
var sender = new Sender(this, Client.Name + "-sender" + Interlocked.Increment(ref linkId), address);
lock (this)
{
this.AddLink(sender, false, null, address);
return sender;
}
}
/// <summary>
/// Creates a Receiver from the client.
/// </summary>
/// <param name="address">The address of the node where messages are received.</param>
/// <returns>A Receiver object.</returns>
public Receiver CreateReceiver(string address)
{
Fx.AssertAndThrow(ErrorCode.ClientNotConnected, this.state > 0 && this.state < 0xff);
var receiver = new Receiver(this, Client.Name + "-receiver" + Interlocked.Increment(ref linkId), address);
lock (this)
{
this.AddLink(receiver, true, address, null);
return receiver;
}
}
/// <summary>
/// Close the client.
/// </summary>
public void Close()
{
try
{
if (this.state > 0 && this.state < 0xff)
{
this.state |= CloseSent;
this.WriteFrame(0, 0, 0x18ul, new List());
Fx.DebugPrint(true, 0, "close", null);
this.Wait(o => (((Client)o).state & CloseReceived) == 0, this, 60000);
}
}
finally
{
if (this.transport != null)
{
this.transport.Close();
}
this.ClearLinks();
if (this.heartBeatTimer != null)
{
// kill heart beat timer
this.heartBeatTimer.Dispose();
}
}
}
internal void Wait(Condition condition, object state, int millisecondsTimeout)
{
while (condition(state))
{
Fx.AssertAndThrow(ErrorCode.ClientWaitTimeout, this.signal.WaitOne(millisecondsTimeout, false));
}
}
internal uint Send(Sender sender, Message message, bool settled)
{
ByteBuffer buffer = new ByteBuffer(128, true);
buffer.AdjustPosition(TransferFramePrefixSize, 0); // reserve space for frame header and transfer
message.Encode(buffer);
while (buffer.Length > 0)
{
this.Wait(o => ((Client)o).outWindow == 0, this, 60000);
lock (this)
{
this.nextOutgoingId++;
if (this.outWindow < uint.MaxValue)
{
this.outWindow--;
}
}
int payload = this.WriteTransferFrame(sender.Handle, this.deliveryId, settled, buffer, this.maxFrameSize);
Fx.DebugPrint(true, 0, "transfer", new List { this.deliveryId, settled, payload }, "delivery-id", "settled", "payload");
}
return this.deliveryId++;
}
internal void CloseLink(Link link)
{
link.State |= DetachSent;
List detach = new List { link.Handle, true };
this.WriteFrame(0, 0, 0x16, detach);
Fx.DebugPrint(true, 0, "detach", detach, "handle");
try
{
this.Wait(o => (((Link)o).State & DetachReceived) == 0, link, 60000);
}
finally
{
link.State = 0xff;
int index = (int)link.RemoteHandle;
lock (this)
{
if (index >= 0)
{
this.links[index] = null;
}
else
{
this.links[~index] = null;
}
}
}
}
internal void SendFlow(uint handle, uint dc, uint credit)
{
List flow;
lock (this)
{
flow = new List() { this.nextIncomingId, this.inWindow, this.nextOutgoingId, this.outWindow, handle, dc, credit };
}
this.WriteFrame(0, 0, 0x13, flow);
Fx.DebugPrint(true, 0, "flow", flow, "next-in-id", "in-window", "next-out", "out-window", "handle", "dc", "credit");
}
internal void SendDisposition(bool role, uint deliveryId, bool settled, DescribedValue state)
{
List disposition = new List() { role, deliveryId, null, settled, state };
this.WriteFrame(0, 0, 0x15ul, disposition);
Fx.DebugPrint(true, 0, "disposition", disposition, "role", "first", "last");
}
void RaiseErrorEvent(Link link, Symbol error)
{
var onError = this.OnError;
if (onError != null)
{
onError.Invoke(this, link, error);
}
}
uint GetLinkHandle(out int index)
{
index = -1;
int bits = 0;
for (int i = 0; i < this.links.Length; i++)
{
if (this.links[i] != null)
{
bits |= (1 << (int)this.links[i].Handle);
}
else if (index < 0)
{
index = i;
}
}
Fx.AssertAndThrow(ErrorCode.ClientNoHandleAvailable, index >= 0);
uint localHandle = 0;
while ((bits & 1) > 0)
{
localHandle++;
bits >>= 1;
}
return localHandle;
}
void AddLink(Link link, bool role, string source, string target)
{
int index;
link.Handle = this.GetLinkHandle(out index);
link.RemoteHandle = (uint)~index;
this.links[index] = link;
link.State = AttachSent;
List attach = new List()
{
link.Name, link.Handle, link.Role, null, null,
new DescribedValue(0x28ul, new List() { source }),
new DescribedValue(0x29ul, new List() { target })
};
this.WriteFrame(0, 0, 0x12, attach);
Fx.DebugPrint(true, 0, "attach", attach, "name", "handle", "role", "snd-mode", "rcv-mode", "source", "target");
}
void ClearLinks()
{
lock (this)
{
for (int i = 0; i < this.links.Length; i++)
{
if (this.links[i] != null)
{
this.links[i].State = 0xff;
this.links[i] = null;
}
}
}
}
void PumpThread()
{
byte frameType;
ushort channel;
ulong code = 0;
List fields = null;
ByteBuffer payload = null;
while (this.state < 0xFF)
{
try
{
this.ReadFrame(out frameType, out channel, out code, out fields, out payload);
this.OnFrame(channel, code, fields, payload);
}
catch (Exception)
{
this.transport.Close();
this.state = 0xFF;
this.Close();
this.RaiseErrorEvent(null, new Symbol("amqp:connection:reset"));
}
this.signal.Set();
}
}
void OnFrame(ushort channel, ulong code, List fields, ByteBuffer payload)
{
switch (code)
{
case 0x10ul: // open
Fx.DebugPrint(false, channel, "open", fields, "container-id", "host-name", "max-frame-size", "channel-max", "idle-time-out");
this.state |= OpenReceived;
// process open.idle-time-out if exists: final value is determined by the min of the local and the remote values
uint remoteValue = uint.MaxValue;
if (fields.Count >= 5 && fields[4] != null)
{
remoteValue = (uint)fields[4];
}
uint timeout = this.idleTimeout < remoteValue ? this.idleTimeout : remoteValue;
if (timeout < uint.MaxValue)
{
timeout -= 5000;
timeout = timeout > MaxIdleTimeout ? MaxIdleTimeout : timeout;
this.heartBeatTimer = new Timer(onHeartBeatTimer, this, (int)timeout, (int)timeout);
}
break;
case 0x11ul: // begin
Fx.DebugPrint(false, channel, "begin", fields, "remote-channel", "next-outgoing-id", "incoming-window", "outgoing-window", "handle-max");
this.nextIncomingId = (uint)fields[1];
this.outWindow = (uint)fields[2];
this.state |= BeginReceived;
break;
case 0x12ul: // attach
{
Fx.DebugPrint(false, channel, "attach", fields, "name", "handle", "role", "snd-mode", "rcv-mode", "source", "target");
Link link = null;
uint remoteHandle = (uint)fields[1];
Fx.AssertAndThrow(ErrorCode.ClientInvalidHandle, remoteHandle < this.links.Length);
lock (this)
{
for (int i = 0; i < this.links.Length; i++)
{
if (this.links[i] != null && this.links[i].Name.Equals(fields[0]))
{
link = this.links[i];
int index = (int)~link.RemoteHandle;
Fx.AssertAndThrow(ErrorCode.ClientInvalidHandle, index == i);
if (index != (int)remoteHandle)
{
Fx.AssertAndThrow(ErrorCode.ClientHandlInUse, this.links[(int)remoteHandle] == null);
this.links[(int)remoteHandle] = link;
this.links[i] = null;
}
break;
}
}
}
Fx.AssertAndThrow(ErrorCode.ClientLinkNotFound, link != null);
link.RemoteHandle = remoteHandle;
link.State |= AttachReceived;
link.OnAttach(fields);
break;
}
case 0x13ul: // flow
{
Fx.DebugPrint(false, channel, "flow", fields, "next-in-id", "in-window", "next-out", "out-window", "handle", "dc", "credit");
uint nextIncomingId = (uint)fields[0];
uint incomingWindow = (uint)fields[1];
lock(this)
{
this.outWindow = incomingWindow < uint.MaxValue ?
nextIncomingId + incomingWindow - this.nextOutgoingId :
uint.MaxValue;
}
if (fields[4] != null)
{
Link link = this.links[(uint)fields[4]];
Fx.AssertAndThrow(ErrorCode.ClientLinkNotFound, link != null);
link.OnFlow(fields);
}
break;
}
case 0x14ul: // transfer
{
lock (this)
{
this.nextOutgoingId++;
if (--this.inWindow == 0)
{
this.inWindow = DefaultWindowSize;
List flow = new List() { this.nextIncomingId, this.inWindow, this.nextOutgoingId, this.outWindow };
this.WriteFrame(0, 0, 0x13, flow);
Fx.DebugPrint(true, 0, "flow", flow, "next-in-id", "in-window", "next-out", "out-window", "handle", "dc", "credit");
}
}
Link link = this.links[(uint)fields[0]];
Fx.AssertAndThrow(ErrorCode.ClientLinkNotFound, link != null);
((Receiver)link).OnTransfer(fields, payload);
break;
}
case 0x15ul: // disposition
{
Fx.DebugPrint(false, channel, "disposition", fields, "role", "first", "last");
bool role = (bool)fields[0];
uint first = (uint)fields[1];
uint last = fields[2] == null ? first : (uint)fields[2];
for (int i = 0; i < this.links.Length; i++)
{
Link link = this.links[i];
if (link != null && role != link.Role)
{
link.OnDisposition(first, last, fields[4] as DescribedValue);
}
}
break;
}
case 0x16ul: // detach
{
Fx.DebugPrint(false, channel, "detach", fields, "handle");
Link link = this.links[(uint)fields[0]];
Fx.AssertAndThrow(ErrorCode.ClientLinkNotFound, link != null);
link.State |= DetachReceived;
if ((link.State & DetachSent) == 0)
{
this.CloseLink(link);
this.RaiseErrorEvent(link, GetError(fields, 2));
}
break;
}
case 0x17ul: // end
Fx.DebugPrint(false, channel, "end", null);
this.state |= EndReceived;
if ((this.state & EndSent) == 0)
{
this.WriteFrame(0, 0, 0x17ul, new List());
Fx.DebugPrint(true, channel, "end", null);
this.ClearLinks();
}
break;
case 0x18ul: // close
Fx.DebugPrint(false, channel, "close", null);
this.state |= CloseReceived;
if ((this.state & CloseSent) == 0)
{
this.Close();
this.RaiseErrorEvent(null, GetError(fields, 0));
}
break;
default:
Fx.AssertAndThrow(ErrorCode.ClientInvalidCodeOnFrame, false);
break;
}
}
void WriteFrame(byte frameType, ushort channel, ulong code, List fields)
{
ByteBuffer buffer = new ByteBuffer(64, true);
// frame header
buffer.Append(FixedWidth.UInt);
AmqpBitConverter.WriteUByte(buffer, 2);
AmqpBitConverter.WriteUByte(buffer, (byte)frameType);
AmqpBitConverter.WriteUShort(buffer, channel);
// command
AmqpBitConverter.WriteUByte(buffer, FormatCode.Described);
Encoder.WriteULong(buffer, code, true);
AmqpBitConverter.WriteUByte(buffer, FormatCode.List32);
int sizeOffset = buffer.WritePos;
buffer.Append(8);
AmqpBitConverter.WriteInt(buffer.Buffer, sizeOffset + 4, fields.Count);
for (int i = 0; i < fields.Count; i++)
{
Encoder.WriteObject(buffer, fields[i]);
}
AmqpBitConverter.WriteInt(buffer.Buffer, sizeOffset, buffer.Length - sizeOffset);
AmqpBitConverter.WriteInt(buffer.Buffer, 0, buffer.Length); // frame size
this.transport.Write(buffer.Buffer, buffer.Offset, buffer.Length);
this.sendActive = true;
}
int WriteTransferFrame(uint handle, uint deliveryId, bool settled, ByteBuffer buffer, int maxFrameSize)
{
// payload should have bytes reserved for frame header and transfer
int frameSize = Math.Min(buffer.Length + TransferFramePrefixSize, maxFrameSize);
int payloadSize = frameSize - TransferFramePrefixSize;
int offset = buffer.Offset - TransferFramePrefixSize;
int pos = offset;
// frame size
buffer.Buffer[pos++] = (byte)(frameSize >> 24);
buffer.Buffer[pos++] = (byte)(frameSize >> 16);
buffer.Buffer[pos++] = (byte)(frameSize >> 8);
buffer.Buffer[pos++] = (byte)frameSize;
// DOF, type and channel
buffer.Buffer[pos++] = 0x02;
buffer.Buffer[pos++] = 0x00;
buffer.Buffer[pos++] = 0x00;
buffer.Buffer[pos++] = 0x00;
// transfer(list8-size,count)
buffer.Buffer[pos++] = 0x00;
buffer.Buffer[pos++] = 0x53;
buffer.Buffer[pos++] = 0x14;
buffer.Buffer[pos++] = 0xc0;
buffer.Buffer[pos++] = 0x10;
buffer.Buffer[pos++] = 0x06;
buffer.Buffer[pos++] = 0x52; // handle
buffer.Buffer[pos++] = (byte)handle;
buffer.Buffer[pos++] = 0x70; // delivery id: uint
buffer.Buffer[pos++] = (byte)(deliveryId >> 24);
buffer.Buffer[pos++] = (byte)(deliveryId >> 16);
buffer.Buffer[pos++] = (byte)(deliveryId >> 8);
buffer.Buffer[pos++] = (byte)deliveryId;
buffer.Buffer[pos++] = 0xa0; // delivery tag: bin8
buffer.Buffer[pos++] = 0x04;
buffer.Buffer[pos++] = (byte)(deliveryId >> 24);
buffer.Buffer[pos++] = (byte)(deliveryId >> 16);
buffer.Buffer[pos++] = (byte)(deliveryId >> 8);
buffer.Buffer[pos++] = (byte)deliveryId;
buffer.Buffer[pos++] = 0x43; // message-format
buffer.Buffer[pos++] = settled ? (byte)0x41 : (byte)0x42; // settled
buffer.Buffer[pos++] = buffer.Length > payloadSize ? (byte)0x41 : (byte)0x42; // more
this.transport.Write(buffer.Buffer, offset, frameSize);
this.sendActive = true;
buffer.Complete(payloadSize);
return payloadSize;
}
void ReadFrame(out byte frameType, out ushort channel, out ulong code, out List fields, out ByteBuffer payload)
{
byte[] headerBuffer = this.ReadFixedSizeBuffer(8);
int size = AmqpBitConverter.ReadInt(headerBuffer, 0);
frameType = headerBuffer[5]; // TODO: header EXT
channel = (ushort)(headerBuffer[6] << 8 | headerBuffer[7]);
size -= 8;
if (size > 0)
{
byte[] frameBuffer = this.ReadFixedSizeBuffer(size);
ByteBuffer buffer = new ByteBuffer(frameBuffer, 0, size, size);
Fx.AssertAndThrow(ErrorCode.ClientInvalidFormatCodeRead, Encoder.ReadFormatCode(buffer) == FormatCode.Described);
code = Encoder.ReadULong(buffer, Encoder.ReadFormatCode(buffer));
fields = Encoder.ReadList(buffer, Encoder.ReadFormatCode(buffer));
if (buffer.Length > 0)
{
payload = new ByteBuffer(buffer.Buffer, buffer.Offset, buffer.Length, buffer.Length);
}
else
{
payload = null;
}
}
else
{
code = 0;
fields = null;
payload = null;
}
}
List ReadFrameBody(byte frameType, ushort channel, ulong code)
{
byte t;
ushort c;
ulong d;
List f;
ByteBuffer p;
this.ReadFrame(out t, out c, out d, out f, out p);
Fx.AssertAndThrow(ErrorCode.ClientInvalidFrameType, t == frameType);
Fx.AssertAndThrow(ErrorCode.ClientInvalidChannel, c == channel);
Fx.AssertAndThrow(ErrorCode.ClientInvalidCode, d == code);
Fx.AssertAndThrow(ErrorCode.ClientInvalidFieldList, f != null);
Fx.AssertAndThrow(ErrorCode.ClientInvalidPayload, p == null);
return f;
}
byte[] ReadFixedSizeBuffer(int size)
{
byte[] buffer = new byte[size];
int offset = 0;
while (size > 0)
{
int bytes = this.transport.Read(buffer, offset, size);
offset += bytes;
size -= bytes;
}
return buffer;
}
static NetworkStream Connect(string host, int port, bool useSsl)
{
var ipHostEntry = Dns.GetHostEntry(host);
Socket socket = null;
SocketException exception = null;
foreach (var ipAddress in ipHostEntry.AddressList)
{
if (ipAddress == null) continue;
socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
try
{
socket.Connect(new IPEndPoint(ipAddress, port));
exception = null;
break;
}
catch (SocketException socketException)
{
exception = socketException;
socket = null;
}
}
if (exception != null)
{
throw exception;
}
NetworkStream stream;
if (useSsl)
{
SslStream sslStream = new SslStream(socket);
#if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || MF_FRAMEWORK_VERSION_V4_4)
sslStream.AuthenticateAsClient(host, null, SslVerification.VerifyPeer, SslProtocols.TLSv1);
#elif (NANOFRAMEWORK_1_0)
sslStream.AuthenticateAsClient(host, null, SslProtocols.Tls11);
#endif
stream = sslStream;
}
else
{
stream = new NetworkStream(socket, true);
}
return stream;
}
static Symbol[] GetSymbolMultiple(object multiple)
{
Symbol[] array = multiple as Symbol[];
if (array != null)
{
return array;
}
Symbol symbol = multiple as Symbol;
if (symbol != null)
{
return new Symbol[] { symbol };
}
throw new Exception("object is not a multiple type");
}
static bool AreHeaderEqual(byte[] b1, byte[] b2)
{
// assume both are 8 bytes
return b1[0] == b2[0] && b1[1] == b2[1] && b1[2] == b2[2] && b1[3] == b2[3]
&& b1[4] == b2[4] && b1[5] == b2[5] && b1[6] == b2[6] && b1[7] == b2[7];
}
static Symbol GetError(List fields, int errorIndex)
{
if (fields.Count > errorIndex && fields[errorIndex] != null)
{
var dv = fields[errorIndex] as DescribedValue;
if (dv != null && dv.Descriptor.Equals(0x1dul))
{
List error = (List)dv.Value;
if (error.Count > 0)
{
return (Symbol)error[0];
}
}
}
return null;
}
static void OnHeartBeatTimer(object state)
{
var thisPtr = (Client)state;
if (!thisPtr.sendActive)
{
try
{
byte[] frame = new byte[] { 0, 0, 0, 8, 2, 0, 0, 0 };
thisPtr.transport.Write(frame, 0, frame.Length);
thisPtr.transport.Flush();
Fx.DebugPrint(true, 0, "empty", null);
}
catch
{
thisPtr.state = 0xff;
thisPtr.Close();
}
}
thisPtr.sendActive = false;
}
}
}