in cs/remote/src/FASTER.server/BinaryServerSession.cs [112:224]
private unsafe void ProcessBatch(byte[] buf, int offset)
{
GetResponseObject();
fixed (byte* b = &buf[offset])
{
byte* d = responseObject.bufferPtr;
var dend = d + responseObject.buffer.Length;
dcurr = d + sizeof(int); // reserve space for size
int origPendingSeqNo = pendingSeqNo;
var src = b;
ref var header = ref Unsafe.AsRef<BatchHeader>(src);
var num = header.NumMessages;
src += BatchHeader.Size;
Status status = default;
dcurr += BatchHeader.Size;
start = 0;
msgnum = 0;
for (msgnum = 0; msgnum < num; msgnum++)
{
var message = (MessageType)(*src++);
var serialNum = hrw.ReadSerialNum(ref src);
switch (message)
{
case MessageType.Upsert:
case MessageType.UpsertAsync:
if ((int)(dend - dcurr) < 2)
SendAndReset(ref d, ref dend);
var keyPtr = src;
status = session.Upsert(ref serializer.ReadKeyByRef(ref src), ref serializer.ReadValueByRef(ref src), serialNo: serialNum);
hrw.Write(message, ref dcurr, (int)(dend - dcurr));
Write(ref status, ref dcurr, (int)(dend - dcurr));
subscribeKVBroker?.Publish(keyPtr);
break;
case MessageType.Read:
case MessageType.ReadAsync:
if ((int)(dend - dcurr) < 2 + maxSizeSettings.MaxOutputSize)
SendAndReset(ref d, ref dend);
long ctx = ((long)message << 32) | (long)pendingSeqNo;
status = session.Read(ref serializer.ReadKeyByRef(ref src), ref serializer.ReadInputByRef(ref src),
ref serializer.AsRefOutput(dcurr + 2, (int)(dend - dcurr)), ctx, serialNum);
hrw.Write(message, ref dcurr, (int)(dend - dcurr));
Write(ref status, ref dcurr, (int)(dend - dcurr));
if (status == Status.PENDING)
Write(pendingSeqNo++, ref dcurr, (int)(dend - dcurr));
else if (status == Status.OK)
serializer.SkipOutput(ref dcurr);
break;
case MessageType.RMW:
case MessageType.RMWAsync:
if ((int)(dend - dcurr) < 2 + maxSizeSettings.MaxOutputSize)
SendAndReset(ref d, ref dend);
keyPtr = src;
ctx = ((long)message << 32) | (long)pendingSeqNo;
status = session.RMW(ref serializer.ReadKeyByRef(ref src), ref serializer.ReadInputByRef(ref src),
ref serializer.AsRefOutput(dcurr + 2, (int)(dend - dcurr)), ctx, serialNum);
hrw.Write(message, ref dcurr, (int)(dend - dcurr));
Write(ref status, ref dcurr, (int)(dend - dcurr));
if (status == Status.PENDING)
Write(pendingSeqNo++, ref dcurr, (int)(dend - dcurr));
else if (status == Status.OK || status == Status.NOTFOUND)
serializer.SkipOutput(ref dcurr);
subscribeKVBroker?.Publish(keyPtr);
break;
case MessageType.Delete:
case MessageType.DeleteAsync:
if ((int)(dend - dcurr) < 2)
SendAndReset(ref d, ref dend);
keyPtr = src;
status = session.Delete(ref serializer.ReadKeyByRef(ref src), serialNo: serialNum);
hrw.Write(message, ref dcurr, (int)(dend - dcurr));
Write(ref status, ref dcurr, (int)(dend - dcurr));
subscribeKVBroker?.Publish(keyPtr);
break;
default:
if (!HandlePubSub(message, ref src, ref d, ref dend)) throw new NotImplementedException();
break;
}
}
if (origPendingSeqNo != pendingSeqNo)
session.CompletePending(true);
// Send replies
if (msgnum - start > 0)
Send(d);
else
{
messageManager.Return(responseObject);
responseObject = null;
}
}
}