private unsafe void ProcessBatch()

in cs/remote/src/FASTER.server/BinaryServerSession.cs [112:224]


        private unsafe void ProcessBatch(byte[] buf, int offset)
        {
            GetResponseObject();

            fixed (byte* b = &buf[offset])
            {
                byte* d = responseObject.bufferPtr;
                var dend = d + responseObject.buffer.Length;
                dcurr = d + sizeof(int); // reserve space for size
                int origPendingSeqNo = pendingSeqNo;

                var src = b;
                ref var header = ref Unsafe.AsRef<BatchHeader>(src);
                var num = header.NumMessages;
                src += BatchHeader.Size;
                Status status = default;

                dcurr += BatchHeader.Size;
                start = 0;
                msgnum = 0;

                for (msgnum = 0; msgnum < num; msgnum++)
                {
                    var message = (MessageType)(*src++);
                    var serialNum = hrw.ReadSerialNum(ref src);
                    switch (message)
                    {
                        case MessageType.Upsert:
                        case MessageType.UpsertAsync:
                            if ((int)(dend - dcurr) < 2)
                                SendAndReset(ref d, ref dend);

                            var keyPtr = src;
                            status = session.Upsert(ref serializer.ReadKeyByRef(ref src), ref serializer.ReadValueByRef(ref src), serialNo: serialNum);

                            hrw.Write(message, ref dcurr, (int)(dend - dcurr));
                            Write(ref status, ref dcurr, (int)(dend - dcurr));

                            subscribeKVBroker?.Publish(keyPtr);
                            break;

                        case MessageType.Read:
                        case MessageType.ReadAsync:
                            if ((int)(dend - dcurr) < 2 + maxSizeSettings.MaxOutputSize)
                                SendAndReset(ref d, ref dend);

                            long ctx = ((long)message << 32) | (long)pendingSeqNo;
                            status = session.Read(ref serializer.ReadKeyByRef(ref src), ref serializer.ReadInputByRef(ref src),
                                ref serializer.AsRefOutput(dcurr + 2, (int)(dend - dcurr)), ctx, serialNum);

                            hrw.Write(message, ref dcurr, (int)(dend - dcurr));
                            Write(ref status, ref dcurr, (int)(dend - dcurr));

                            if (status == Status.PENDING)
                                Write(pendingSeqNo++, ref dcurr, (int)(dend - dcurr));
                            else if (status == Status.OK)
                                serializer.SkipOutput(ref dcurr);
                            break;

                        case MessageType.RMW:
                        case MessageType.RMWAsync:
                            if ((int)(dend - dcurr) < 2 + maxSizeSettings.MaxOutputSize)
                                SendAndReset(ref d, ref dend);

                            keyPtr = src;

                            ctx = ((long)message << 32) | (long)pendingSeqNo;
                            status = session.RMW(ref serializer.ReadKeyByRef(ref src), ref serializer.ReadInputByRef(ref src),
                                ref serializer.AsRefOutput(dcurr + 2, (int)(dend - dcurr)), ctx, serialNum);

                            hrw.Write(message, ref dcurr, (int)(dend - dcurr));
                            Write(ref status, ref dcurr, (int)(dend - dcurr));
                            if (status == Status.PENDING)
                                Write(pendingSeqNo++, ref dcurr, (int)(dend - dcurr));
                            else if (status == Status.OK || status == Status.NOTFOUND)
                                serializer.SkipOutput(ref dcurr);

                            subscribeKVBroker?.Publish(keyPtr);
                            break;

                        case MessageType.Delete:
                        case MessageType.DeleteAsync:
                            if ((int)(dend - dcurr) < 2)
                                SendAndReset(ref d, ref dend);

                            keyPtr = src;
                            status = session.Delete(ref serializer.ReadKeyByRef(ref src), serialNo: serialNum);

                            hrw.Write(message, ref dcurr, (int)(dend - dcurr));
                            Write(ref status, ref dcurr, (int)(dend - dcurr));

                            subscribeKVBroker?.Publish(keyPtr);
                            break;

                        default:
                            if (!HandlePubSub(message, ref src, ref d, ref dend)) throw new NotImplementedException();
                            break;
                    }
                }

                if (origPendingSeqNo != pendingSeqNo)
                    session.CompletePending(true);

                // Send replies
                if (msgnum - start > 0)
                    Send(d);
                else
                {
                    messageManager.Return(responseObject);
                    responseObject = null;
                }
            }
        }