internal void ProcessReplies()

in cs/remote/src/FASTER.client/ClientSession.cs [348:580]


        internal void ProcessReplies(byte[] buf, int offset)
        {
            Output defaultOutput = default;
            fixed (byte* b = &buf[offset])
            {
                var src = b;
                var seqNo = ((BatchHeader*)src)->SeqNo;
                var count = ((BatchHeader*)src)->NumMessages;
                if (seqNo != lastSeqNo + 1 && !subscriptionSession)
                    throw new Exception("Out of order message within session");
                lastSeqNo = seqNo;

                src += BatchHeader.Size;

                for (int i = 0; i < count; i++)
                {
                    switch ((MessageType)(*src++))
                    {
                        case MessageType.Upsert:
                            {
                                var status = ReadStatus(ref src);
                                (Key, Value, Context) result = upsertQueue.Dequeue();
                                functions.UpsertCompletionCallback(ref result.Item1, ref result.Item2, result.Item3);
                                break;
                            }
                        case MessageType.UpsertAsync:
                            {
                                var status = ReadStatus(ref src);
                                (Key, Value, Context) result = upsertQueue.Dequeue();
                                var tcs = tcsQueue.Dequeue();
                                tcs.SetResult((status, default));
                                break;
                            }
                        case MessageType.Read:
                            {
                                var status = ReadStatus(ref src);
                                var result = readrmwQueue.Dequeue();
                                if (status == Status.OK)
                                {
                                    result.Item3 = serializer.ReadOutput(ref src);
                                    functions.ReadCompletionCallback(ref result.Item1, ref result.Item2, ref result.Item3, result.Item4, status);
                                }
                                else if (status == Status.PENDING)
                                {
                                    var p = hrw.ReadPendingSeqNo(ref src);
                                    readRmwPendingContext.Add(p, result);
                                }
                                else
                                    functions.ReadCompletionCallback(ref result.Item1, ref result.Item2, ref defaultOutput, result.Item4, status);

                                break;
                            }
                        case MessageType.ReadAsync:
                            {
                                var status = ReadStatus(ref src);
                                var result = readrmwQueue.Dequeue();
                                var tcs = tcsQueue.Dequeue();
                                if (status == Status.OK)
                                    tcs.SetResult((status, serializer.ReadOutput(ref src)));
                                else if (status == Status.PENDING)
                                {
                                    var p = hrw.ReadPendingSeqNo(ref src);
                                    readRmwPendingTcs.Add(p, tcs);
                                }
                                else
                                    tcs.SetResult((status, default));
                                break;
                            }
                        case MessageType.RMW:
                            {
                                var status = ReadStatus(ref src);
                                var result = readrmwQueue.Dequeue();
                                if (status == Status.OK || status == Status.NOTFOUND)
                                {
                                    result.Item3 = serializer.ReadOutput(ref src);
                                    functions.RMWCompletionCallback(ref result.Item1, ref result.Item2, ref result.Item3, result.Item4, status);
                                }
                                else if (status == Status.PENDING)
                                {
                                    var p = hrw.ReadPendingSeqNo(ref src);
                                    readRmwPendingContext.Add(p, result);
                                }
                                else
                                    functions.RMWCompletionCallback(ref result.Item1, ref result.Item2, ref defaultOutput, result.Item4, status);
                                break;
                            }
                        case MessageType.RMWAsync:
                            {
                                var status = ReadStatus(ref src);
                                var result = readrmwQueue.Dequeue();
                                var tcs = tcsQueue.Dequeue();
                                if (status == Status.OK || status == Status.NOTFOUND)
                                    tcs.SetResult((status, serializer.ReadOutput(ref src)));
                                else if (status == Status.PENDING)
                                {
                                    var p = hrw.ReadPendingSeqNo(ref src);
                                    readRmwPendingTcs.Add(p, tcs);
                                }
                                else
                                    tcs.SetResult((status, default));
                                break;
                            }
                        case MessageType.Delete:
                            {
                                var status = ReadStatus(ref src);
                                (Key, Value, Context) result = upsertQueue.Dequeue();
                                functions.DeleteCompletionCallback(ref result.Item1, result.Item3);
                                break;
                            }
                        case MessageType.DeleteAsync:
                            {
                                var status = ReadStatus(ref src);
                                (Key, Value, Context) result = upsertQueue.Dequeue();
                                var tcs = tcsQueue.Dequeue();
                                tcs.SetResult((status, default));
                                break;
                            }
                        case MessageType.SubscribeKV:
                            {
                                var status = ReadStatus(ref src);
                                var p = hrw.ReadPendingSeqNo(ref src);
                                if (status == Status.OK)
                                {
                                    readRmwPendingContext.TryGetValue(p, out var result);
                                    result.Item3 = serializer.ReadOutput(ref src);
                                    functions.SubscribeKVCallback(ref result.Item1, ref result.Item2, ref result.Item3, result.Item4, Status.OK);
                                }
                                else if (status == Status.NOTFOUND)
                                {
                                    readRmwPendingContext.TryGetValue(p, out var result);
                                    functions.SubscribeKVCallback(ref result.Item1, ref result.Item2, ref defaultOutput, result.Item4, Status.NOTFOUND);
                                }
                                else if (status == Status.PENDING)
                                {
                                    var result = readrmwQueue.Dequeue();
                                    readRmwPendingContext.Add(p, result);
                                }
                                else
                                {
                                    throw new Exception("Unexpected status of SubscribeKV");
                                }
                                break;
                            }
                        case MessageType.PSubscribeKV:
                            {
                                var status = ReadStatus(ref src);
                                var p = hrw.ReadPendingSeqNo(ref src);
                                if (status == Status.OK)
                                {
                                    readRmwPendingContext.TryGetValue(p, out var result);
                                    result.Item1 = serializer.ReadKey(ref src);
                                    result.Item3 = serializer.ReadOutput(ref src);
                                    functions.SubscribeKVCallback(ref result.Item1, ref result.Item2, ref result.Item3, result.Item4, Status.OK);
                                }
                                else if (status == Status.NOTFOUND)
                                {
                                    readRmwPendingContext.TryGetValue(p, out var result);
                                    result.Item1 = serializer.ReadKey(ref src);
                                    functions.SubscribeKVCallback(ref result.Item1, ref result.Item2, ref defaultOutput, result.Item4, Status.NOTFOUND);
                                }
                                else if (status == Status.PENDING)
                                {
                                    var result = readrmwQueue.Dequeue();
                                    readRmwPendingContext.Add(p, result);
                                }
                                else
                                {
                                    throw new Exception("Unexpected status of SubscribeKV");
                                }
                                break;
                            }
                        case MessageType.Publish:
                            {
                                var status = ReadStatus(ref src);
                                (Key, Value, Context) result = upsertQueue.Dequeue();
                                functions.PublishCompletionCallback(ref result.Item1, ref result.Item2, result.Item3);
                                break;
                            }
                        case MessageType.Subscribe:
                            {
                                var status = ReadStatus(ref src);
                                var p = hrw.ReadPendingSeqNo(ref src);
                                if (status == Status.OK)
                                {
                                    pubsubPendingContext.TryGetValue(p, out var result);
                                    result.Item2 = serializer.ReadValue(ref src);
                                    functions.SubscribeCallback(ref result.Item1, ref result.Item2, result.Item3);
                                }
                                else if (status == Status.PENDING)
                                {
                                    var result = pubsubQueue.Dequeue();
                                    pubsubPendingContext.Add(p, result);
                                }
                                else
                                {
                                    throw new Exception("Unexpected status of SubscribeKV");
                                }
                                break;
                            }
                        case MessageType.PSubscribe:
                            {
                                var status = ReadStatus(ref src);
                                var p = hrw.ReadPendingSeqNo(ref src);
                                if (status == Status.OK)
                                {
                                    pubsubPendingContext.TryGetValue(p, out var result);
                                    result.Item1 = serializer.ReadKey(ref src);
                                    result.Item2 = serializer.ReadValue(ref src);
                                    functions.SubscribeCallback(ref result.Item1, ref result.Item2, result.Item3);
                                }
                                else if (status == Status.PENDING)
                                {
                                    var result = pubsubQueue.Dequeue();
                                    pubsubPendingContext.Add(p, result);
                                }
                                else
                                {
                                    throw new Exception("Unexpected status of SubscribeKV");
                                }
                                break;
                            }
                        case MessageType.PendingResult:
                            {
                                HandlePending(ref src);
                                break;
                            }
                        default:
                            throw new NotImplementedException();
                    }
                }
            }
            Interlocked.Decrement(ref numPendingBatches);
        }