in Clients/CSharp/AmbrosiaLibCS/Immortal.cs [286:666]
protected async Task Dispatch(int bytesToRead = 0)
#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously
{
#if DEBUG
//Console.WriteLine($"Dispatch loop starting on task '{Thread.CurrentThread.ManagedThreadId}'");
#endif
#region RPC Encoding
// |m|f|b| lFR|n| args|
// |R|ret|
// |n|returnValue|
// where:
// R = RPC byte (byte)
// ret = whether this is a return value or not (byte)
// if it is *not* a return value
// m = method id of the method that is being called (variable)
// f = whether this is a fire-and-forget message (byte)
// b = size of the return address (variable)
// lFR = (bytes encoding) the return address
// n = sequence number (variable)
// args = serialized arguments, number and size baked into the generated code
//
// b, lFR, and n are optional (as a unit).
// the value for f indicates whether they are present or not.
// if it is a return value
// n = sequence number (variable)
// returnValue = serialized return value, size baked into the generated code
#endregion
_inputFlexBuffer = new FlexReadBuffer();
int RPCsReceived = 0;
while (!cancelTokenSource.IsCancellationRequested)
{
//Console.WriteLine("Waiting for next batch of messages from the LAR");
lock (DispatchTaskIdQueueLock)
{
if (this.DispatchTaskIdQueue.Data.Count > 1)
{
int x;
while (!this.DispatchTaskIdQueue.Data.TryDequeue(out x)) { }
break; // some other dispatch loop will take over, so just die.
}
}
if (bytesToRead <= 24)
{
int commitID = await this._ambrosiaReceiveFromStream.ReadIntFixedAsync(cancelTokenSource.Token);
bytesToRead = await this._ambrosiaReceiveFromStream.ReadIntFixedAsync(cancelTokenSource.Token);
long checkBytes = await this._ambrosiaReceiveFromStream.ReadLongFixedAsync(cancelTokenSource.Token);
long writeSeqID = await this._ambrosiaReceiveFromStream.ReadLongFixedAsync(cancelTokenSource.Token);
}
while (bytesToRead > 24)
{
//Console.WriteLine("Waiting for the deserialization of a message from the LAR");
await FlexReadBuffer.DeserializeAsync(this._ambrosiaReceiveFromStream, _inputFlexBuffer, cancelTokenSource.Token);
bytesToRead -= _inputFlexBuffer.Length;
_cursor = _inputFlexBuffer.LengthLength; // this way we don't need to compute how much space was used to represent the length of the buffer.
var firstByte = _inputFlexBuffer.Buffer[_cursor];
switch (firstByte)
{
case AmbrosiaRuntimeLBConstants.InitalMessageByte:
{
#if DEBUG
Console.WriteLine("*X* Received an initial message");
#endif
_cursor++;
var messageLength = _inputFlexBuffer.Buffer.ReadBufferedInt(_cursor);
_cursor += IntSize(messageLength);
var messageBuffer = new byte[messageLength];
Buffer.BlockCopy(_inputFlexBuffer.Buffer, _cursor, messageBuffer, 0, messageLength);
var message = Encoding.UTF8.GetString(messageBuffer);
// Actually, the message is just for fun. It is a signal to call OnFirstStart()
//Task.Factory.StartNew(
// () => this.OnFirstStart()
// , CancellationToken.None, TaskCreationOptions.DenyChildAttach
// , DispatchTaskScheduler
// );
await this.OnFirstStartWrapper();
break;
}
case AmbrosiaRuntimeLBConstants.checkpointByte:
{
#if DEBUG
Console.WriteLine("*X* Received a checkpoint message");
#endif
// TODO: this message should contain a (serialized - doh!) checkpoint. Restore the state.
_cursor++;
var sizeOfCheckpoint = _inputFlexBuffer.Buffer.ReadBufferedLong(_cursor);
_cursor += LongSize(sizeOfCheckpoint);
using (var readStreamWrapper = new PassThruReadStream(_ambrosiaReceiveFromStream, sizeOfCheckpoint))
{
CopyFromDeserializedImmortal(readStreamWrapper);
}
break;
}
case AmbrosiaRuntimeLBConstants.takeCheckpointByte:
{
#if DEBUG
Console.WriteLine("*X* Received a take checkpoint message");
#endif
_cursor++;
await this.TakeCheckpointAsync();
break;
}
case AmbrosiaRuntimeLBConstants.becomingPrimaryByte:
{
#if DEBUG
Console.WriteLine("*X* Received a becoming primary message");
#endif
_cursor++;
this.IsPrimary = true;
this.BecomingPrimary();
break;
}
case AmbrosiaRuntimeLBConstants.takeBecomingPrimaryCheckpointByte:
{
#if DEBUG
Console.WriteLine("*X* Received a take checkpoint message");
#endif
_cursor++;
await this.TakeCheckpointAsync();
this.IsPrimary = true;
this.BecomingPrimary();
break;
}
case AmbrosiaRuntimeLBConstants.upgradeTakeCheckpointByte:
case AmbrosiaRuntimeLBConstants.upgradeServiceByte:
{
if (firstByte == AmbrosiaRuntimeLBConstants.upgradeTakeCheckpointByte)
{
#if DEBUG
Console.WriteLine("*X* Received a upgrade and take checkpoint message");
#endif
}
else
{
#if DEBUG
Console.WriteLine("*X* Received a upgrade service message");
#endif
}
_cursor++;
if (this.upgradeInterface == null || this.upgradeImmortalType == null)
{
throw new Exception("Non-upgradeable deployment received an upgrade message.");
}
var newImmortal = (Immortal)Activator.CreateInstance(this.upgradeImmortalType, this);
var upgradedImmortalSerializerType = upgradeInterface.Assembly.GetType("Ambrosia.ImmortalSerializer");
var immortalSerializer = (ImmortalSerializerBase)Activator.CreateInstance(upgradedImmortalSerializerType);
var upgradedDispatcherType = upgradeInterface.Assembly.GetType(upgradeInterface.FullName + "_Dispatcher_Implementation");
var untypedProxy = Activator.CreateInstance(upgradedDispatcherType, newImmortal, immortalSerializer, "", -1 /*ignored when not setting up connections*/, -1 /*ignored when not setting up connections*/, false);
var typedProxy = (Dispatcher)untypedProxy;
// Copy over all of the state from this Immortal to the new one
foreach (var f in typeof(Immortal).GetFields(BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public))
{
f.SetValue(newImmortal, f.GetValue(this));
}
// IMPORTANT: But the value for the back pointer to the server proxy should be the newly generated proxy
newImmortal._dispatcher = typedProxy;
if (firstByte == AmbrosiaRuntimeLBConstants.upgradeTakeCheckpointByte)
{
await newImmortal.TakeCheckpointAsync();
newImmortal.IsPrimary = true;
newImmortal.BecomingPrimary();
}
// Start the new Immortal: start its Dispatch loop (BUT, perhaps still reading from the same page)
var t = newImmortal.DispatchWrapper(bytesToRead);
// IMPORTANT: set the pointer from the old server proxy to the new one
// This allows the new proxy to get disposed when (and not until!) the old
// one is disposed.
this._dispatcher.upgradedProxy = typedProxy;
// Need to die now, so do that by exiting loop
t.Start(newImmortal.DispatchTaskScheduler);
return;
}
case AmbrosiaRuntimeLBConstants.RPCByte:
case AmbrosiaRuntimeLBConstants.RPCBatchByte:
case AmbrosiaRuntimeLBConstants.CountReplayableRPCBatchByte:
{
RPCsReceived++;
var numberOfRPCs = 1;
var lengthOfCurrentRPC = 0;
int endIndexOfCurrentRPC = 0;
if (firstByte == AmbrosiaRuntimeLBConstants.RPCBatchByte || firstByte == AmbrosiaRuntimeLBConstants.CountReplayableRPCBatchByte)
{
_cursor++;
numberOfRPCs = _inputFlexBuffer.Buffer.ReadBufferedInt(_cursor);
_cursor += IntSize(numberOfRPCs);
if (firstByte == AmbrosiaRuntimeLBConstants.CountReplayableRPCBatchByte)
{
var numReplayableRPCs = _inputFlexBuffer.Buffer.ReadBufferedInt(_cursor);
_cursor += IntSize(numReplayableRPCs);
}
//Console.WriteLine($"ServerImmortal received batch RPC (#{RPCsReceived}) with {numberOfRPCs} RPCs");
}
else
{
//endIndexOfCurrentRPC = _inputFlexBuffer.Buffer.Length;
endIndexOfCurrentRPC = _inputFlexBuffer.Length;
//Console.WriteLine($"ServerImmortal received single RPC (#{RPCsReceived}). End index: {endIndexOfCurrentRPC}");
}
for (int i = 0; i < numberOfRPCs; i++)
{
if (1 < numberOfRPCs)
{
lengthOfCurrentRPC = _inputFlexBuffer.Buffer.ReadBufferedInt(_cursor);
_cursor += IntSize(lengthOfCurrentRPC);
endIndexOfCurrentRPC = _cursor + lengthOfCurrentRPC;
}
var shouldBeRPCByte = _inputFlexBuffer.Buffer[_cursor];
if (shouldBeRPCByte != AmbrosiaRuntimeLBConstants.RPCByte)
{
Console.WriteLine("UNKNOWN BYTE: {0}!!", shouldBeRPCByte);
throw new Exception("Illegal leading byte in message");
}
_cursor++;
var returnValueType = (ReturnValueTypes)_inputFlexBuffer.Buffer[_cursor++];
if (returnValueType != ReturnValueTypes.None) // receiving a return value
{
var senderOfRPCLength = _inputFlexBuffer.Buffer.ReadBufferedInt(_cursor);
var sizeOfSender = IntSize(senderOfRPCLength);
_cursor += sizeOfSender;
var senderOfRPC = Encoding.UTF8.GetString(_inputFlexBuffer.Buffer, _cursor, senderOfRPCLength);
_cursor += senderOfRPCLength;
var sequenceNumber = _inputFlexBuffer.Buffer.ReadBufferedLong(_cursor);
_cursor += LongSize(sequenceNumber);
//Console.WriteLine("Received RPC call to method with id: {0} and seq no.: {1}", methodId, CurrentSequenceNumber);
#if DEBUG
Console.WriteLine($"*X* Got response for {sequenceNumber} from {senderOfRPC}");
#endif
if (this.CallCache.Data.TryRemove(sequenceNumber, out var taskCompletionSource))
{
switch (returnValueType)
{
case ReturnValueTypes.ReturnValue:
var deserializeNextValue = typeof(Immortal).GetMethods().FirstOrDefault(m => m.GetCustomAttributes(typeof(DeserializeNextValueAttribute)).Any());
if (deserializeNextValue != null)
{
var genericDeserializeNextValueMethod = deserializeNextValue.MakeGenericMethod(taskCompletionSource.ResultType.Type);
var result = genericDeserializeNextValueMethod.Invoke(this, null);
taskCompletionSource.SetResult(result);
}
break;
case ReturnValueTypes.EmptyReturnValue:
taskCompletionSource.SetResult(GetDefault(taskCompletionSource.ResultType.Type));
break;
case ReturnValueTypes.Exception:
var exceptionObj = DeserializeNextValue<object>();
var exception = new SerializableException(exceptionObj, senderOfRPC);
taskCompletionSource.SetException(exception);
break;
default:
throw new ArgumentException($"Got an unfamiliar ReturnValueType: {returnValueType}");
}
}
else
{
var errorMessage = $"Can't find sequence number {sequenceNumber} in cache";
throw new InvalidOperationException(errorMessage);
}
await Task.Yield();
}
else // receiving an RPC
{
var methodId = _inputFlexBuffer.Buffer.ReadBufferedInt(_cursor);
_cursor += IntSize(methodId);
var rpcType = (RpcTypes.RpcType)_inputFlexBuffer.Buffer[_cursor++];
string senderOfRPC = null;
long sequenceNumber = 0;
if (!rpcType.IsFireAndForget())
{
// read return address and sequence number
var senderOfRPCLength = _inputFlexBuffer.Buffer.ReadBufferedInt(_cursor);
var sizeOfSender = IntSize(senderOfRPCLength);
_cursor += sizeOfSender;
senderOfRPC = Encoding.UTF8.GetString(_inputFlexBuffer.Buffer, _cursor, senderOfRPCLength);
_cursor += senderOfRPCLength;
sequenceNumber = _inputFlexBuffer.Buffer.ReadBufferedLong(_cursor);
_cursor += LongSize(sequenceNumber);
//Console.WriteLine("Received RPC call to method with id: {0} and sequence number {1}", methodId, CurrentSequenceNumber);
}
else
{
//Console.WriteLine("Received fire-and-forget RPC call to method with id: {0}", methodId);
}
var lengthOfSerializedArguments = endIndexOfCurrentRPC - _cursor;
byte[] localBuffer = new byte[lengthOfSerializedArguments];
Buffer.BlockCopy(_inputFlexBuffer.Buffer, _cursor, localBuffer, 0, lengthOfSerializedArguments);
//// BUGBUG: This works only if we are single-threaded and doing only fire-and-forget messages!
//while (DispatchTaskScheduler.NumberOfScheduledTasks() == DispatchTaskScheduler.MaximumConcurrencyLevel)
//{
// // just busy wait until there is a free thread in the scheduler
// // to handle this task.
//}
//Task.Factory.StartNew(
// () => _dispatcher.DispatchToMethod(methodId, fireAndForget, senderOfRPC, CurrentSequenceNumber, localBuffer, 0)
// , CancellationToken.None, TaskCreationOptions.DenyChildAttach
// , DispatchTaskScheduler
// );
try
{
await _dispatcher.DispatchToMethod(methodId, rpcType, senderOfRPC, sequenceNumber, localBuffer, 0);
}
catch (Exception ex)
{
Console.Write(ex.Message);
}
_cursor += lengthOfSerializedArguments;
}
}
break;
}
default:
{
var s = String.Format("Illegal leading byte in message: {0}", firstByte);
#if DEBUG
Console.WriteLine(s);
#endif
throw new Exception(s);
}
}
_inputFlexBuffer.ResetBuffer();
}
}
}