in cs/remote/src/FASTER.client/ClientSession.cs [348:580]
internal void ProcessReplies(byte[] buf, int offset)
{
Output defaultOutput = default;
fixed (byte* b = &buf[offset])
{
var src = b;
var seqNo = ((BatchHeader*)src)->SeqNo;
var count = ((BatchHeader*)src)->NumMessages;
if (seqNo != lastSeqNo + 1 && !subscriptionSession)
throw new Exception("Out of order message within session");
lastSeqNo = seqNo;
src += BatchHeader.Size;
for (int i = 0; i < count; i++)
{
switch ((MessageType)(*src++))
{
case MessageType.Upsert:
{
var status = ReadStatus(ref src);
(Key, Value, Context) result = upsertQueue.Dequeue();
functions.UpsertCompletionCallback(ref result.Item1, ref result.Item2, result.Item3);
break;
}
case MessageType.UpsertAsync:
{
var status = ReadStatus(ref src);
(Key, Value, Context) result = upsertQueue.Dequeue();
var tcs = tcsQueue.Dequeue();
tcs.SetResult((status, default));
break;
}
case MessageType.Read:
{
var status = ReadStatus(ref src);
var result = readrmwQueue.Dequeue();
if (status == Status.OK)
{
result.Item3 = serializer.ReadOutput(ref src);
functions.ReadCompletionCallback(ref result.Item1, ref result.Item2, ref result.Item3, result.Item4, status);
}
else if (status == Status.PENDING)
{
var p = hrw.ReadPendingSeqNo(ref src);
readRmwPendingContext.Add(p, result);
}
else
functions.ReadCompletionCallback(ref result.Item1, ref result.Item2, ref defaultOutput, result.Item4, status);
break;
}
case MessageType.ReadAsync:
{
var status = ReadStatus(ref src);
var result = readrmwQueue.Dequeue();
var tcs = tcsQueue.Dequeue();
if (status == Status.OK)
tcs.SetResult((status, serializer.ReadOutput(ref src)));
else if (status == Status.PENDING)
{
var p = hrw.ReadPendingSeqNo(ref src);
readRmwPendingTcs.Add(p, tcs);
}
else
tcs.SetResult((status, default));
break;
}
case MessageType.RMW:
{
var status = ReadStatus(ref src);
var result = readrmwQueue.Dequeue();
if (status == Status.OK || status == Status.NOTFOUND)
{
result.Item3 = serializer.ReadOutput(ref src);
functions.RMWCompletionCallback(ref result.Item1, ref result.Item2, ref result.Item3, result.Item4, status);
}
else if (status == Status.PENDING)
{
var p = hrw.ReadPendingSeqNo(ref src);
readRmwPendingContext.Add(p, result);
}
else
functions.RMWCompletionCallback(ref result.Item1, ref result.Item2, ref defaultOutput, result.Item4, status);
break;
}
case MessageType.RMWAsync:
{
var status = ReadStatus(ref src);
var result = readrmwQueue.Dequeue();
var tcs = tcsQueue.Dequeue();
if (status == Status.OK || status == Status.NOTFOUND)
tcs.SetResult((status, serializer.ReadOutput(ref src)));
else if (status == Status.PENDING)
{
var p = hrw.ReadPendingSeqNo(ref src);
readRmwPendingTcs.Add(p, tcs);
}
else
tcs.SetResult((status, default));
break;
}
case MessageType.Delete:
{
var status = ReadStatus(ref src);
(Key, Value, Context) result = upsertQueue.Dequeue();
functions.DeleteCompletionCallback(ref result.Item1, result.Item3);
break;
}
case MessageType.DeleteAsync:
{
var status = ReadStatus(ref src);
(Key, Value, Context) result = upsertQueue.Dequeue();
var tcs = tcsQueue.Dequeue();
tcs.SetResult((status, default));
break;
}
case MessageType.SubscribeKV:
{
var status = ReadStatus(ref src);
var p = hrw.ReadPendingSeqNo(ref src);
if (status == Status.OK)
{
readRmwPendingContext.TryGetValue(p, out var result);
result.Item3 = serializer.ReadOutput(ref src);
functions.SubscribeKVCallback(ref result.Item1, ref result.Item2, ref result.Item3, result.Item4, Status.OK);
}
else if (status == Status.NOTFOUND)
{
readRmwPendingContext.TryGetValue(p, out var result);
functions.SubscribeKVCallback(ref result.Item1, ref result.Item2, ref defaultOutput, result.Item4, Status.NOTFOUND);
}
else if (status == Status.PENDING)
{
var result = readrmwQueue.Dequeue();
readRmwPendingContext.Add(p, result);
}
else
{
throw new Exception("Unexpected status of SubscribeKV");
}
break;
}
case MessageType.PSubscribeKV:
{
var status = ReadStatus(ref src);
var p = hrw.ReadPendingSeqNo(ref src);
if (status == Status.OK)
{
readRmwPendingContext.TryGetValue(p, out var result);
result.Item1 = serializer.ReadKey(ref src);
result.Item3 = serializer.ReadOutput(ref src);
functions.SubscribeKVCallback(ref result.Item1, ref result.Item2, ref result.Item3, result.Item4, Status.OK);
}
else if (status == Status.NOTFOUND)
{
readRmwPendingContext.TryGetValue(p, out var result);
result.Item1 = serializer.ReadKey(ref src);
functions.SubscribeKVCallback(ref result.Item1, ref result.Item2, ref defaultOutput, result.Item4, Status.NOTFOUND);
}
else if (status == Status.PENDING)
{
var result = readrmwQueue.Dequeue();
readRmwPendingContext.Add(p, result);
}
else
{
throw new Exception("Unexpected status of SubscribeKV");
}
break;
}
case MessageType.Publish:
{
var status = ReadStatus(ref src);
(Key, Value, Context) result = upsertQueue.Dequeue();
functions.PublishCompletionCallback(ref result.Item1, ref result.Item2, result.Item3);
break;
}
case MessageType.Subscribe:
{
var status = ReadStatus(ref src);
var p = hrw.ReadPendingSeqNo(ref src);
if (status == Status.OK)
{
pubsubPendingContext.TryGetValue(p, out var result);
result.Item2 = serializer.ReadValue(ref src);
functions.SubscribeCallback(ref result.Item1, ref result.Item2, result.Item3);
}
else if (status == Status.PENDING)
{
var result = pubsubQueue.Dequeue();
pubsubPendingContext.Add(p, result);
}
else
{
throw new Exception("Unexpected status of SubscribeKV");
}
break;
}
case MessageType.PSubscribe:
{
var status = ReadStatus(ref src);
var p = hrw.ReadPendingSeqNo(ref src);
if (status == Status.OK)
{
pubsubPendingContext.TryGetValue(p, out var result);
result.Item1 = serializer.ReadKey(ref src);
result.Item2 = serializer.ReadValue(ref src);
functions.SubscribeCallback(ref result.Item1, ref result.Item2, result.Item3);
}
else if (status == Status.PENDING)
{
var result = pubsubQueue.Dequeue();
pubsubPendingContext.Add(p, result);
}
else
{
throw new Exception("Unexpected status of SubscribeKV");
}
break;
}
case MessageType.PendingResult:
{
HandlePending(ref src);
break;
}
default:
throw new NotImplementedException();
}
}
}
Interlocked.Decrement(ref numPendingBatches);
}