protected async Task Dispatch()

in Clients/CSharp/AmbrosiaLibCS/Immortal.cs [286:666]


        protected async Task Dispatch(int bytesToRead = 0)
#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously
        {
#if DEBUG
            //Console.WriteLine($"Dispatch loop starting on task '{Thread.CurrentThread.ManagedThreadId}'");
#endif

            #region RPC Encoding
            //       |m|f|b| lFR|n| args|
            // |R|ret|
            //       |n|returnValue|
            // where:
            // R = RPC byte (byte)
            // ret = whether this is a return value or not (byte)

            // if it is *not* a return value
            // m = method id of the method that is being called (variable)
            // f = whether this is a fire-and-forget message (byte)
            // b = size of the return address (variable)
            // lFR = (bytes encoding) the return address
            // n = sequence number (variable)
            // args = serialized arguments, number and size baked into the generated code
            //
            // b, lFR, and n are optional (as a unit).
            // the value for f indicates whether they are present or not.

            // if it is a return value
            // n = sequence number (variable)
            // returnValue = serialized return value, size baked into the generated code

            #endregion

            _inputFlexBuffer = new FlexReadBuffer();
            int RPCsReceived = 0;

            while (!cancelTokenSource.IsCancellationRequested)
            {
                //Console.WriteLine("Waiting for next batch of messages from the LAR");

                lock (DispatchTaskIdQueueLock)
                {
                    if (this.DispatchTaskIdQueue.Data.Count > 1)
                    {
                        int x;
                        while (!this.DispatchTaskIdQueue.Data.TryDequeue(out x)) { }
                        break; // some other dispatch loop will take over, so just die.
                    }
                }

                if (bytesToRead <= 24)
                {
                    int commitID = await this._ambrosiaReceiveFromStream.ReadIntFixedAsync(cancelTokenSource.Token);
                    bytesToRead = await this._ambrosiaReceiveFromStream.ReadIntFixedAsync(cancelTokenSource.Token);
                    long checkBytes = await this._ambrosiaReceiveFromStream.ReadLongFixedAsync(cancelTokenSource.Token);
                    long writeSeqID = await this._ambrosiaReceiveFromStream.ReadLongFixedAsync(cancelTokenSource.Token);
                }

                while (bytesToRead > 24)
                {
                    //Console.WriteLine("Waiting for the deserialization of a message from the LAR");

                    await FlexReadBuffer.DeserializeAsync(this._ambrosiaReceiveFromStream, _inputFlexBuffer, cancelTokenSource.Token);

                    bytesToRead -= _inputFlexBuffer.Length;

                    _cursor = _inputFlexBuffer.LengthLength; // this way we don't need to compute how much space was used to represent the length of the buffer.

                    var firstByte = _inputFlexBuffer.Buffer[_cursor];

                    switch (firstByte)
                    {
                        case AmbrosiaRuntimeLBConstants.InitalMessageByte:
                            {
#if DEBUG
                                Console.WriteLine("*X* Received an initial message");
#endif

                                _cursor++;
                                var messageLength = _inputFlexBuffer.Buffer.ReadBufferedInt(_cursor);
                                _cursor += IntSize(messageLength);
                                var messageBuffer = new byte[messageLength];
                                Buffer.BlockCopy(_inputFlexBuffer.Buffer, _cursor, messageBuffer, 0, messageLength);
                                var message = Encoding.UTF8.GetString(messageBuffer);
                                // Actually, the message is just for fun. It is a signal to call OnFirstStart()
                                //Task.Factory.StartNew(
                                //    () => this.OnFirstStart()
                                //        , CancellationToken.None, TaskCreationOptions.DenyChildAttach
                                //        , DispatchTaskScheduler
                                //    );

                                await this.OnFirstStartWrapper();

                                break;
                            }
                        case AmbrosiaRuntimeLBConstants.checkpointByte:
                            {
#if DEBUG
                                Console.WriteLine("*X* Received a checkpoint message");
#endif
                                // TODO: this message should contain a (serialized - doh!) checkpoint. Restore the state.
                                _cursor++;

                                var sizeOfCheckpoint = _inputFlexBuffer.Buffer.ReadBufferedLong(_cursor);
                                _cursor += LongSize(sizeOfCheckpoint);
                                using (var readStreamWrapper = new PassThruReadStream(_ambrosiaReceiveFromStream, sizeOfCheckpoint))
                                {
                                    CopyFromDeserializedImmortal(readStreamWrapper);
                                }
                                break;
                            }

                        case AmbrosiaRuntimeLBConstants.takeCheckpointByte:
                            {
#if DEBUG
                                Console.WriteLine("*X* Received a take checkpoint message");
#endif
                                _cursor++;

                                await this.TakeCheckpointAsync();

                                break;
                            }

                        case AmbrosiaRuntimeLBConstants.becomingPrimaryByte:
                            {
#if DEBUG
                                Console.WriteLine("*X* Received a becoming primary message");
#endif
                                _cursor++;
                                this.IsPrimary = true;
                                this.BecomingPrimary();
                                break;
                            }


                        case AmbrosiaRuntimeLBConstants.takeBecomingPrimaryCheckpointByte:
                            {
#if DEBUG
                                Console.WriteLine("*X* Received a take checkpoint message");
#endif
                                _cursor++;

                                await this.TakeCheckpointAsync();
                                this.IsPrimary = true;
                                this.BecomingPrimary();

                                break;
                            }

                        case AmbrosiaRuntimeLBConstants.upgradeTakeCheckpointByte:
                        case AmbrosiaRuntimeLBConstants.upgradeServiceByte:
                            {
                                if (firstByte == AmbrosiaRuntimeLBConstants.upgradeTakeCheckpointByte)
                                {
#if DEBUG
                                    Console.WriteLine("*X* Received a upgrade and take checkpoint message");
#endif
                                }
                                else
                                {
#if DEBUG
                                    Console.WriteLine("*X* Received a upgrade service message");
#endif
                                }
                                _cursor++;

                                if (this.upgradeInterface == null || this.upgradeImmortalType == null)
                                {
                                    throw new Exception("Non-upgradeable deployment received an upgrade message.");
                                }

                                var newImmortal = (Immortal)Activator.CreateInstance(this.upgradeImmortalType, this);
                                var upgradedImmortalSerializerType = upgradeInterface.Assembly.GetType("Ambrosia.ImmortalSerializer");

                                var immortalSerializer = (ImmortalSerializerBase)Activator.CreateInstance(upgradedImmortalSerializerType);

                                var upgradedDispatcherType = upgradeInterface.Assembly.GetType(upgradeInterface.FullName + "_Dispatcher_Implementation");

                                var untypedProxy = Activator.CreateInstance(upgradedDispatcherType, newImmortal, immortalSerializer, "", -1 /*ignored when not setting up connections*/, -1 /*ignored when not setting up connections*/, false);
                                var typedProxy = (Dispatcher)untypedProxy;

                                // Copy over all of the state from this Immortal to the new one
                                foreach (var f in typeof(Immortal).GetFields(BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public))
                                {
                                    f.SetValue(newImmortal, f.GetValue(this));
                                }

                                // IMPORTANT: But the value for the back pointer to the server proxy should be the newly generated proxy
                                newImmortal._dispatcher = typedProxy;

                                if (firstByte == AmbrosiaRuntimeLBConstants.upgradeTakeCheckpointByte)
                                {
                                    await newImmortal.TakeCheckpointAsync();
                                    newImmortal.IsPrimary = true;
                                    newImmortal.BecomingPrimary();
                                }

                                // Start the new Immortal: start its Dispatch loop (BUT, perhaps still reading from the same page)
                                var t = newImmortal.DispatchWrapper(bytesToRead);

                                // IMPORTANT: set the pointer from the old server proxy to the new one
                                // This allows the new proxy to get disposed when (and not until!) the old
                                // one is disposed.
                                this._dispatcher.upgradedProxy = typedProxy;

                                // Need to die now, so do that by exiting loop
                                t.Start(newImmortal.DispatchTaskScheduler);
                                return;

                            }

                        case AmbrosiaRuntimeLBConstants.RPCByte:
                        case AmbrosiaRuntimeLBConstants.RPCBatchByte:
                        case AmbrosiaRuntimeLBConstants.CountReplayableRPCBatchByte:
                            {
                                RPCsReceived++;
                                var numberOfRPCs = 1;
                                var lengthOfCurrentRPC = 0;
                                int endIndexOfCurrentRPC = 0;

                                if (firstByte == AmbrosiaRuntimeLBConstants.RPCBatchByte || firstByte == AmbrosiaRuntimeLBConstants.CountReplayableRPCBatchByte)
                                {
                                    _cursor++;
                                    numberOfRPCs = _inputFlexBuffer.Buffer.ReadBufferedInt(_cursor);
                                    _cursor += IntSize(numberOfRPCs);
                                    if (firstByte == AmbrosiaRuntimeLBConstants.CountReplayableRPCBatchByte)
                                    {
                                        var numReplayableRPCs = _inputFlexBuffer.Buffer.ReadBufferedInt(_cursor);
                                        _cursor += IntSize(numReplayableRPCs);
                                    }
                                    //Console.WriteLine($"ServerImmortal received batch RPC (#{RPCsReceived}) with {numberOfRPCs} RPCs");
                                }
                                else
                                {
                                    //endIndexOfCurrentRPC = _inputFlexBuffer.Buffer.Length;
                                    endIndexOfCurrentRPC = _inputFlexBuffer.Length;
                                    //Console.WriteLine($"ServerImmortal received single RPC (#{RPCsReceived}). End index: {endIndexOfCurrentRPC}");
                                }

                                for (int i = 0; i < numberOfRPCs; i++)
                                {
                                    if (1 < numberOfRPCs)
                                    {
                                        lengthOfCurrentRPC = _inputFlexBuffer.Buffer.ReadBufferedInt(_cursor);
                                        _cursor += IntSize(lengthOfCurrentRPC);
                                        endIndexOfCurrentRPC = _cursor + lengthOfCurrentRPC;
                                    }

                                    var shouldBeRPCByte = _inputFlexBuffer.Buffer[_cursor];
                                    if (shouldBeRPCByte != AmbrosiaRuntimeLBConstants.RPCByte)
                                    {
                                        Console.WriteLine("UNKNOWN BYTE: {0}!!", shouldBeRPCByte);
                                        throw new Exception("Illegal leading byte in message");
                                    }
                                    _cursor++;

                                    var returnValueType = (ReturnValueTypes)_inputFlexBuffer.Buffer[_cursor++];

                                    if (returnValueType != ReturnValueTypes.None) // receiving a return value
                                    {
                                        var senderOfRPCLength = _inputFlexBuffer.Buffer.ReadBufferedInt(_cursor);
                                        var sizeOfSender = IntSize(senderOfRPCLength);
                                        _cursor += sizeOfSender;
                                        var senderOfRPC = Encoding.UTF8.GetString(_inputFlexBuffer.Buffer, _cursor, senderOfRPCLength);
                                        _cursor += senderOfRPCLength;

                                        var sequenceNumber = _inputFlexBuffer.Buffer.ReadBufferedLong(_cursor);
                                        _cursor += LongSize(sequenceNumber);

                                        //Console.WriteLine("Received RPC call to method with id: {0} and seq no.: {1}", methodId, CurrentSequenceNumber);
#if DEBUG
                                        Console.WriteLine($"*X* Got response for {sequenceNumber} from {senderOfRPC}");
#endif

                                        if (this.CallCache.Data.TryRemove(sequenceNumber, out var taskCompletionSource))
                                        {
                                            switch (returnValueType)
                                            {
                                                case ReturnValueTypes.ReturnValue:
                                                    var deserializeNextValue = typeof(Immortal).GetMethods().FirstOrDefault(m => m.GetCustomAttributes(typeof(DeserializeNextValueAttribute)).Any());
                                                    if (deserializeNextValue != null)
                                                    {
                                                        var genericDeserializeNextValueMethod = deserializeNextValue.MakeGenericMethod(taskCompletionSource.ResultType.Type);
                                                        var result = genericDeserializeNextValueMethod.Invoke(this, null);

                                                        taskCompletionSource.SetResult(result);
                                                    }
                                                    break;
                                                case ReturnValueTypes.EmptyReturnValue:
                                                    taskCompletionSource.SetResult(GetDefault(taskCompletionSource.ResultType.Type));
                                                    break;
                                                case ReturnValueTypes.Exception:
                                                    var exceptionObj = DeserializeNextValue<object>();
                                                    var exception = new SerializableException(exceptionObj, senderOfRPC);
                                                    taskCompletionSource.SetException(exception);
                                                    break;
                                                default:
                                                    throw new ArgumentException($"Got an unfamiliar ReturnValueType: {returnValueType}");
                                            }
                                        }
                                        else
                                        {
                                            var errorMessage = $"Can't find sequence number {sequenceNumber} in cache";
                                            throw new InvalidOperationException(errorMessage);
                                        }

                                        await Task.Yield();
                                    }
                                    else // receiving an RPC
                                    {
                                        var methodId = _inputFlexBuffer.Buffer.ReadBufferedInt(_cursor);
                                        _cursor += IntSize(methodId);
                                        var rpcType = (RpcTypes.RpcType)_inputFlexBuffer.Buffer[_cursor++];

                                        string senderOfRPC = null;
                                        long sequenceNumber = 0;

                                        if (!rpcType.IsFireAndForget())
                                        {
                                            // read return address and sequence number
                                            var senderOfRPCLength = _inputFlexBuffer.Buffer.ReadBufferedInt(_cursor);
                                            var sizeOfSender = IntSize(senderOfRPCLength);
                                            _cursor += sizeOfSender;
                                            senderOfRPC = Encoding.UTF8.GetString(_inputFlexBuffer.Buffer, _cursor, senderOfRPCLength);
                                            _cursor += senderOfRPCLength;
                                            sequenceNumber = _inputFlexBuffer.Buffer.ReadBufferedLong(_cursor);
                                            _cursor += LongSize(sequenceNumber);
                                            //Console.WriteLine("Received RPC call to method with id: {0} and sequence number {1}", methodId, CurrentSequenceNumber);
                                        }
                                        else
                                        {

                                            //Console.WriteLine("Received fire-and-forget RPC call to method with id: {0}", methodId);
                                        }

                                        var lengthOfSerializedArguments = endIndexOfCurrentRPC - _cursor;
                                        byte[] localBuffer = new byte[lengthOfSerializedArguments];
                                        Buffer.BlockCopy(_inputFlexBuffer.Buffer, _cursor, localBuffer, 0, lengthOfSerializedArguments);

                                        //// BUGBUG: This works only if we are single-threaded and doing only fire-and-forget messages!
                                        //while (DispatchTaskScheduler.NumberOfScheduledTasks() == DispatchTaskScheduler.MaximumConcurrencyLevel)
                                        //{
                                        //    // just busy wait until there is a free thread in the scheduler
                                        //    // to handle this task.
                                        //}

                                        //Task.Factory.StartNew(
                                        //    () => _dispatcher.DispatchToMethod(methodId, fireAndForget, senderOfRPC, CurrentSequenceNumber, localBuffer, 0)
                                        //        , CancellationToken.None, TaskCreationOptions.DenyChildAttach
                                        //        , DispatchTaskScheduler
                                        //    );
                                        try
                                        {
                                            await _dispatcher.DispatchToMethod(methodId, rpcType, senderOfRPC, sequenceNumber, localBuffer, 0);
                                        }
                                        catch (Exception ex)
                                        {
                                            Console.Write(ex.Message);
                                        }

                                        _cursor += lengthOfSerializedArguments;
                                    }

                                }
                                break;
                            }

                        default:
                            {
                                var s = String.Format("Illegal leading byte in message: {0}", firstByte);
#if DEBUG
                                Console.WriteLine(s);
#endif
                                throw new Exception(s);
                            }
                    }

                    _inputFlexBuffer.ResetBuffer();
                }
            }
        }