in sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Protocols/WebPubSubJsonProtocolBase.cs [70:307]
public virtual IReadOnlyList<WebPubSubMessage> ParseMessage(ReadOnlySequence<byte> input)
{
try
{
string type = null;
DownstreamEventType eventType = DownstreamEventType.Ack;
string group = null;
string @event = null;
SystemEventType systemEventType = SystemEventType.Connected;
long? ackId = null;
long? sequenceId = null;
bool? success = null;
string from = null;
FromType fromType = FromType.Server;
AckMessageError errorDetail = null;
WebPubSubDataType dataType = WebPubSubDataType.Text;
string userId = null;
string connectionId = null;
string reconnectionToken = null;
string message = null;
string fromUserId = null;
var completed = false;
bool hasDataToken = false;
BinaryData data = null;
SequencePosition dataStart = default;
Utf8JsonReader dataReader = default;
var reader = new Utf8JsonReader(input, isFinalBlock: true, state: default);
reader.CheckRead();
// We're always parsing a JSON object
reader.EnsureObjectStart();
do
{
switch (reader.TokenType)
{
case JsonTokenType.PropertyName:
if (reader.ValueTextEquals(TypePropertyNameBytes.EncodedUtf8Bytes))
{
type = reader.ReadAsNullableString(TypePropertyName);
if (type == null)
{
throw new InvalidDataException($"Expected '{TypePropertyName}' to be of type {JsonTokenType.String}.");
}
if (!Enum.TryParse(type, true, out eventType))
{
throw new InvalidDataException($"Unknown '{TypePropertyName}': {type}.");
}
}
else if (reader.ValueTextEquals(GroupPropertyNameBytes.EncodedUtf8Bytes))
{
group = reader.ReadAsNullableString(GroupPropertyName);
}
else if (reader.ValueTextEquals(EventPropertyNameBytes.EncodedUtf8Bytes))
{
@event = reader.ReadAsNullableString(EventPropertyName);
if (!Enum.TryParse(@event, true, out systemEventType))
{
throw new InvalidDataException($"Unknown '{EventPropertyName}': {@event}.");
}
}
else if (reader.ValueTextEquals(DataTypePropertyNameBytes.EncodedUtf8Bytes))
{
var dataTypeValue = reader.ReadAsNullableString(DataTypePropertyName);
if (!Enum.TryParse<WebPubSubDataType>(dataTypeValue, true, out dataType))
{
throw new InvalidDataException($"Unknown '{DataTypePropertyName}': {dataTypeValue}.");
}
}
else if (reader.ValueTextEquals(AckIdPropertyNameBytes.EncodedUtf8Bytes))
{
try
{
ackId = reader.ReadAsLongNonNegtive(AckIdPropertyName);
}
catch (FormatException)
{
throw new InvalidDataException($"'{AckIdPropertyName}' is not a valid uint64 value.");
}
}
else if (reader.ValueTextEquals(DataPropertyNameBytes.EncodedUtf8Bytes))
{
hasDataToken = true;
dataStart = reader.Position;
reader.Skip();
dataReader = reader;
}
else if (reader.ValueTextEquals(SequenceIdPropertyNameBytes.EncodedUtf8Bytes))
{
try
{
sequenceId = reader.ReadAsLongNonNegtive(SequenceIdPropertyName);
}
catch (FormatException)
{
throw new InvalidDataException($"'{SequenceIdPropertyName}' is not a valid uint64 value.");
}
}
else if (reader.ValueTextEquals(SuccessPropertyNameBytes.EncodedUtf8Bytes))
{
success = reader.ReadAsBoolean(SuccessPropertyName);
}
else if (reader.ValueTextEquals(ErrorPropertyNameBytes.EncodedUtf8Bytes))
{
errorDetail = ReadErrorDetail(reader);
}
else if (reader.ValueTextEquals(FromPropertyNameBytes.EncodedUtf8Bytes))
{
from = reader.ReadAsNullableString(FromPropertyName);
if (!Enum.TryParse(from, true, out fromType))
{
throw new InvalidDataException($"Unknown '{FromPropertyName}': {from}.");
}
}
else if (reader.ValueTextEquals(UserIdPropertyNameBytes.EncodedUtf8Bytes))
{
userId = reader.ReadAsNullableString(UserIdPropertyName);
}
else if (reader.ValueTextEquals(ConnectionIdPropertyNameBytes.EncodedUtf8Bytes))
{
connectionId = reader.ReadAsNullableString(ConnectionIdPropertyName);
}
else if (reader.ValueTextEquals(ReconnectionTokenPropertyNameBytes.EncodedUtf8Bytes))
{
reconnectionToken = reader.ReadAsNullableString(ReconnectionTokenPropertyName);
}
else if (reader.ValueTextEquals(MessagePropertyNameBytes.EncodedUtf8Bytes))
{
message = reader.ReadAsNullableString(MessagePropertyName);
}
else if (reader.ValueTextEquals(FromUserIdPropertyNameBytes.EncodedUtf8Bytes))
{
fromUserId = reader.ReadAsNullableString(FromUserIdPropertyName);
}
else
{
reader.CheckRead();
reader.Skip();
}
break;
case JsonTokenType.EndObject:
completed = true;
break;
}
}
while (!completed && reader.CheckRead());
if (type == null)
{
throw new InvalidDataException($"Missing required property '{TypePropertyName}'.");
}
if (hasDataToken)
{
if (dataType == WebPubSubDataType.Binary ||
dataType == WebPubSubDataType.Protobuf ||
dataType == WebPubSubDataType.Text)
{
if (dataReader.TokenType != JsonTokenType.String)
{
throw new InvalidDataException($"'data' should be a string when 'dataType' is 'binary,text,protobuf'.");
}
if (dataType == WebPubSubDataType.Binary ||
dataType == WebPubSubDataType.Protobuf)
{
if (!dataReader.TryGetBytesFromBase64(out var bytes))
{
throw new InvalidDataException($"'data' is not a valid base64 encoded string.");
}
data = new BinaryData(bytes);
}
else
{
data = new BinaryData(dataReader.GetString());
}
}
else if (dataType == WebPubSubDataType.Json)
{
if (dataReader.TokenType == JsonTokenType.Null)
{
throw new InvalidDataException($"Invalid value for '{DataPropertyName}': null.");
}
var end = dataReader.Position;
data = new BinaryData(input.Slice(dataStart, end).ToArray());
}
}
switch (eventType)
{
case DownstreamEventType.Ack:
AssertNotNull(ackId, AckIdPropertyName);
AssertNotNull(success, SuccessPropertyName);
return new List<WebPubSubMessage> { new AckMessage(ackId.Value, success.Value, errorDetail) };
case DownstreamEventType.Message:
AssertNotNull(from, FromPropertyName);
AssertNotNull(dataType, FromPropertyName);
AssertNotNull(data, DataPropertyName);
switch (fromType)
{
case FromType.Server:
return new List<WebPubSubMessage> { new ServerDataMessage(dataType, data, sequenceId) };
case FromType.Group:
AssertNotNull(group, GroupPropertyName);
return new List<WebPubSubMessage> { new GroupDataMessage(group, dataType, data, sequenceId, fromUserId) };
// Forward compatible
default:
return null;
}
case DownstreamEventType.System:
AssertNotNull(@event, EventPropertyName);
switch (systemEventType)
{
case SystemEventType.Connected:
return new List<WebPubSubMessage> { new ConnectedMessage(userId, connectionId, reconnectionToken) };
case SystemEventType.Disconnected:
return new List<WebPubSubMessage> { new DisconnectedMessage(message) };
// Forward compatible
default:
return null;
}
// Forward compatible
default:
return null;
}
}
catch (JsonException ex)
{
throw new InvalidDataException("Error reading JSON.", ex);
}
}