private unsafe bool ProcessBatch()

in cs/remote/src/FASTER.server/WebsocketServerSession.cs [181:545]


        private unsafe bool ProcessBatch(byte[] buf, int offset)
        {
            bool completeWSCommand = true;
            GetResponseObject();

            fixed (byte* b = &buf[offset])
            {
                byte* d = responseObject.bufferPtr;
                var dend = d + responseObject.buffer.Length;
                dcurr = d; // reserve space for size
                var bytesAvailable = bytesRead - readHead;
                var _origReadHead = readHead;
                int msglen = 0;
                byte[] decoded = Array.Empty<byte>();
                var ptr = recvBufferPtr + readHead;
                var totalMsgLen = 0;
                List<Decoder> decoderInfoList = new();

                if (buf[offset] == 71 && buf[offset + 1] == 69 && buf[offset + 2] == 84)
                {
                    // 1. Obtain the value of the "Sec-WebSocket-Key" request header without any leading or trailing whitespace
                    // 2. Concatenate it with "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" (a special GUID specified by RFC 6455)
                    // 3. Compute SHA-1 and Base64 hash of the new value
                    // 4. Write the hash back as the value of "Sec-WebSocket-Accept" response header in an HTTP response
                    string s = Encoding.UTF8.GetString(buf, offset, buf.Length - offset);
                    string swk = Regex.Match(s, "Sec-WebSocket-Key: (.*)").Groups[1].Value.Trim();
                    string swka = swk + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
                    byte[] swkaSha1 = System.Security.Cryptography.SHA1.Create().ComputeHash(Encoding.UTF8.GetBytes(swka));
                    string swkaSha1Base64 = Convert.ToBase64String(swkaSha1);

                    // HTTP/1.1 defines the sequence CR LF as the end-of-line marker
                    byte[] response = Encoding.UTF8.GetBytes(
                        "HTTP/1.1 101 Switching Protocols\r\n" +
                        "Connection: Upgrade\r\n" +
                        "Upgrade: websocket\r\n" +
                        "Sec-WebSocket-Accept: " + swkaSha1Base64 + "\r\n\r\n");

                    fixed (byte* responsePtr = &response[0])
                        Buffer.MemoryCopy(responsePtr, dcurr, response.Length, response.Length);

                    dcurr += response.Length;

                    SendResponse((int)(d - responseObject.bufferPtr), (int)(dcurr - d));
                    readHead = bytesRead;
                    return completeWSCommand;

                }
                else
                {
                    var decoderInfo = new Decoder();

                    bool fin = (buf[offset] & 0b10000000) != 0,
                        mask = (buf[offset + 1] & 0b10000000) != 0; // must be true, "All messages from the client to the server have this bit set"

                    int opcode = buf[offset] & 0b00001111; // expecting 1 - text message
                    offset++;

                    msglen = buf[offset] - 128; // & 0111 1111

                    if (msglen < 125)
                    {
                        offset++;
                    }
                    else if (msglen == 126)
                    {
                        msglen = BitConverter.ToUInt16(new byte[] { buf[offset + 2], buf[offset + 1] }, 0);
                        offset += 3;
                    }
                    else if (msglen == 127)
                    {
                        msglen = (int)BitConverter.ToUInt64(new byte[] { buf[offset + 8], buf[offset + 7], buf[offset + 6], buf[offset + 5], buf[offset + 4], buf[offset + 3], buf[offset + 2], buf[offset + 1] }, 0);
                        offset += 9;
                    }

                    if (msglen == 0)
                        Console.WriteLine("msglen == 0");


                    decoderInfo.maskStart = offset;
                    decoderInfo.msgLen = msglen;
                    decoderInfo.dataStart = offset + 4;
                    decoderInfoList.Add(decoderInfo);
                    totalMsgLen += msglen;
                    offset += 4;

                    if (fin == false)
                    {
                        byte[] decodedClientMsgLen = new byte[sizeof(Int32)];
                        byte[] clientMsgLenMask = new byte[4] { buf[decoderInfo.maskStart], buf[decoderInfo.maskStart + 1], buf[decoderInfo.maskStart + 2], buf[decoderInfo.maskStart + 3] };
                        for (int i = 0; i < sizeof(Int32); ++i)
                            decodedClientMsgLen[i] = (byte)(buf[decoderInfo.dataStart + i] ^ clientMsgLenMask[i % 4]);
                        var clientMsgLen = (int)BitConverter.ToInt32(decodedClientMsgLen, 0);
                        if (clientMsgLen > bytesRead)
                            return false;
                    }

                    var nextBufOffset = offset;

                    while (fin == false)
                    {
                        nextBufOffset += msglen;

                        fin = ((buf[nextBufOffset]) & 0b10000000) != 0;

                        nextBufOffset++;
                        var nextMsgLen = buf[nextBufOffset] - 128; // & 0111 1111

                        offset++;
                        nextBufOffset++;

                        if (nextMsgLen < 125)
                        {
                            nextBufOffset++;
                            offset++;
                        }
                        else if (nextMsgLen == 126)
                        {
                            offset += 3;
                            nextMsgLen = BitConverter.ToUInt16(new byte[] { buf[nextBufOffset + 1], buf[nextBufOffset] }, 0);
                            nextBufOffset += 2;
                        }
                        else if (nextMsgLen == 127)
                        {
                            offset += 9;
                            nextMsgLen = (int)BitConverter.ToUInt64(new byte[] { buf[nextBufOffset + 7], buf[nextBufOffset + 6], buf[nextBufOffset + 5], buf[nextBufOffset + 4], buf[nextBufOffset + 3], buf[nextBufOffset + 2], buf[nextBufOffset + 1], buf[nextBufOffset] }, 0);
                            nextBufOffset += 8;
                        }

                        var nextDecoderInfo = new Decoder();
                        nextDecoderInfo.msgLen = nextMsgLen;
                        nextDecoderInfo.maskStart = nextBufOffset;
                        nextDecoderInfo.dataStart = nextBufOffset + 4;
                        decoderInfoList.Add(nextDecoderInfo);
                        totalMsgLen += nextMsgLen;
                        offset += 4;
                    }

                    completeWSCommand = true;

                    var decodedIndex = 0;
                    decoded = new byte[totalMsgLen];
                    for (int decoderListIdx = 0; decoderListIdx < decoderInfoList.Count; decoderListIdx++)
                    {
                        {
                            var decoderInfoElem = decoderInfoList[decoderListIdx];
                            byte[] masks = new byte[4] { buf[decoderInfoElem.maskStart], buf[decoderInfoElem.maskStart + 1], buf[decoderInfoElem.maskStart + 2], buf[decoderInfoElem.maskStart + 3] };

                            for (int i = 0; i < decoderInfoElem.msgLen; ++i)
                                decoded[decodedIndex++] = (byte)(buf[decoderInfoElem.dataStart + i] ^ masks[i % 4]);
                        }
                    }

                    offset += totalMsgLen;
                    readHead = offset;
                }

                dcurr = d;
                dcurr += 10;
                dcurr += sizeof(int); // reserve space for size
                int origPendingSeqNo = pendingSeqNo;

                dcurr += BatchHeader.Size;
                start = 0;
                msgnum = 0;

                fixed (byte* ptr1 = &decoded[4])
                {
                    var src = ptr1;
                    ref var header = ref Unsafe.AsRef<BatchHeader>(src);
                    int num = *(int*)(src + 4);
                    src += BatchHeader.Size;
                    Status status = default;

                    for (msgnum = 0; msgnum < num; msgnum++)
                    {
                        var message = (MessageType)(*src++);

                        switch (message)
                        {
                            case MessageType.Upsert:
                            case MessageType.UpsertAsync:
                                if ((int)(dend - dcurr) < 2)
                                    SendAndReset(ref d, ref dend);

                                var keyPtr = src;
                                status = session.Upsert(ref serializer.ReadKeyByRef(ref src), ref serializer.ReadValueByRef(ref src));

                                hrw.Write(message, ref dcurr, (int)(dend - dcurr));
                                Write(ref status, ref dcurr, (int)(dend - dcurr));

                                if (subscribeKVBroker != null)
                                    subscribeKVBroker.Publish(keyPtr);
                                break;

                            case MessageType.Read:
                            case MessageType.ReadAsync:
                                if ((int)(dend - dcurr) < 2 + maxSizeSettings.MaxOutputSize)
                                    SendAndReset(ref d, ref dend);

                                long ctx = ((long)message << 32) | (long)pendingSeqNo;
                                status = session.Read(ref serializer.ReadKeyByRef(ref src), ref serializer.ReadInputByRef(ref src),
                                    ref serializer.AsRefOutput(dcurr + 2, (int)(dend - dcurr)), ctx, 0);

                                hrw.Write(message, ref dcurr, (int)(dend - dcurr));
                                Write(ref status, ref dcurr, (int)(dend - dcurr));

                                if (status == core.Status.PENDING)
                                    Write(pendingSeqNo++, ref dcurr, (int)(dend - dcurr));
                                else if (status == core.Status.OK)
                                    serializer.SkipOutput(ref dcurr);

                                break;

                            case MessageType.RMW:
                            case MessageType.RMWAsync:
                                if ((int)(dend - dcurr) < 2)
                                    SendAndReset(ref d, ref dend);

                                keyPtr = src;

                                ctx = ((long)message << 32) | (long)pendingSeqNo;
                                status = session.RMW(ref serializer.ReadKeyByRef(ref src), ref serializer.ReadInputByRef(ref src), ctx);

                                hrw.Write(message, ref dcurr, (int)(dend - dcurr));
                                Write(ref status, ref dcurr, (int)(dend - dcurr));
                                if (status == Status.PENDING)
                                    Write(pendingSeqNo++, ref dcurr, (int)(dend - dcurr));

                                if (subscribeKVBroker != null)
                                    subscribeKVBroker.Publish(keyPtr);
                                break;

                            case MessageType.Delete:
                            case MessageType.DeleteAsync:
                                if ((int)(dend - dcurr) < 2)
                                    SendAndReset(ref d, ref dend);

                                keyPtr = src;

                                status = session.Delete(ref serializer.ReadKeyByRef(ref src));

                                hrw.Write(message, ref dcurr, (int)(dend - dcurr));
                                Write(ref status, ref dcurr, (int)(dend - dcurr));

                                if (subscribeKVBroker != null)
                                    subscribeKVBroker.Publish(keyPtr);
                                break;

                            case MessageType.SubscribeKV:
                                Debug.Assert(subscribeKVBroker != null);

                                if ((int)(dend - dcurr) < 2 + maxSizeSettings.MaxOutputSize)
                                    SendAndReset(ref d, ref dend);

                                var keyStart = src;
                                ref Key key = ref serializer.ReadKeyByRef(ref src);

                                var inputStart = src;
                                ref Input input = ref serializer.ReadInputByRef(ref src);

                                int sid = subscribeKVBroker.Subscribe(ref keyStart, ref inputStart, this);
                                status = Status.PENDING;

                                hrw.Write(message, ref dcurr, (int)(dend - dcurr));
                                Write(ref status, ref dcurr, (int)(dend - dcurr));
                                Write(sid, ref dcurr, (int)(dend - dcurr));
                                serializer.Write(ref key, ref dcurr, (int)(dend - dcurr));

                                break;

                            case MessageType.PSubscribeKV:
                                Debug.Assert(subscribeKVBroker != null);

                                if ((int)(dend - dcurr) < 2 + maxSizeSettings.MaxOutputSize)
                                    SendAndReset(ref d, ref dend);

                                keyStart = src;
                                key = ref serializer.ReadKeyByRef(ref src);

                                inputStart = src;
                                input = ref serializer.ReadInputByRef(ref src);

                                sid = subscribeKVBroker.PSubscribe(ref keyStart, ref inputStart, this);
                                status = Status.PENDING;

                                hrw.Write(message, ref dcurr, (int)(dend - dcurr));
                                Write(ref status, ref dcurr, (int)(dend - dcurr));
                                Write(sid, ref dcurr, (int)(dend - dcurr));
                                serializer.Write(ref key, ref dcurr, (int)(dend - dcurr));

                                break;

                            case MessageType.Publish:
                                Debug.Assert(subscribeBroker != null);

                                if ((int)(dend - dcurr) < 2)
                                    SendAndReset(ref d, ref dend);

                                keyPtr = src;
                                key = ref serializer.ReadKeyByRef(ref src);
                                byte* valPtr = src;
                                ref Value val = ref serializer.ReadValueByRef(ref src);
                                int valueLength = (int)(src - valPtr);

                                status = Status.OK;
                                hrw.Write(message, ref dcurr, (int)(dend - dcurr));
                                Write(ref status, ref dcurr, (int)(dend - dcurr));

                                if (subscribeBroker != null)
                                    subscribeBroker.Publish(keyPtr, valPtr, valueLength);
                                break;

                            case MessageType.Subscribe:
                                Debug.Assert(subscribeBroker != null);

                                if ((int)(dend - dcurr) < 2 + maxSizeSettings.MaxOutputSize)
                                    SendAndReset(ref d, ref dend);

                                keyStart = src;
                                serializer.ReadKeyByRef(ref src);

                                sid = subscribeBroker.Subscribe(ref keyStart, this);
                                status = Status.PENDING;
                                hrw.Write(message, ref dcurr, (int)(dend - dcurr));
                                Write(ref status, ref dcurr, (int)(dend - dcurr));
                                Write(sid, ref dcurr, (int)(dend - dcurr));
                                break;

                            case MessageType.PSubscribe:
                                Debug.Assert(subscribeBroker != null);

                                if ((int)(dend - dcurr) < 2 + maxSizeSettings.MaxOutputSize)
                                    SendAndReset(ref d, ref dend);

                                keyStart = src;
                                serializer.ReadKeyByRef(ref src);

                                sid = subscribeBroker.PSubscribe(ref keyStart, this);
                                status = Status.PENDING;
                                hrw.Write(message, ref dcurr, (int)(dend - dcurr));
                                Write(ref status, ref dcurr, (int)(dend - dcurr));
                                Write(sid, ref dcurr, (int)(dend - dcurr));
                                break;

                            default:
                                throw new NotImplementedException();
                        }
                    }
                }

                if (origPendingSeqNo != pendingSeqNo)
                    session.CompletePending(true);

                // Send replies
                if (msgnum - start > 0)
                    Send(d);
                else
                {
                    messageManager.Return(responseObject);
                    responseObject = null;
                }
            }

            return completeWSCommand;
        }