in csharp/json/src/KafkaJsonDeserializer.cs [43:99]
public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
if (isNull || data == null || data.IsEmpty)
{
return default(T);
}
if (!context.Headers.TryGetLastBytes("schemaId", out var lastHeader) || lastHeader.Length == 0)
{
return default(T);
}
var schemaId = UTF8Encoding.UTF8.GetString(lastHeader);
if (string.IsNullOrEmpty(schemaId))
{
return default(T);
}
var schemaRegistryData = this.schemaRegistryClient.GetSchema(schemaId).Value;
if (schemaRegistryData.Properties.Format != SchemaFormat.Json)
{
throw new SerializationException(new Error(ErrorCode.Local_ValueDeserialization, $"Schema id {schemaId} is not of json format. the schema is a {schemaRegistryData.Properties.Format} schema."));
}
else if (string.IsNullOrEmpty(schemaRegistryData.Definition))
{
throw new SerializationException(new Error(ErrorCode.Local_ValueDeserialization, $"Schema id {schemaId} has empty schema."));
}
// This implementation is actually based on the old Newtonsoft Json implementation which
// uses a older json-schema draft version.
// When we updated to use the latest Newtonsoft package/draft, this implementation will
// need to change using the new classes.
using (var stringReader = new StringReader(UTF8Encoding.UTF8.GetString(data)))
{
JsonTextReader reader = new JsonTextReader(stringReader);
try
{
JsonValidatingReader validatingReader = new JsonValidatingReader(reader);
validatingReader.Schema = JsonSchema.Parse(schemaRegistryData.Definition);
IList<string> messages = new List<string>();
validatingReader.ValidationEventHandler += (o, a) => messages.Add(a.Message);
T obj = serializer.Deserialize<T>(validatingReader);
if (messages.Count > 0)
{
throw new SerializationException(new Error(ErrorCode.Local_ValueDeserialization, string.Concat(messages)));
}
return obj;
}
finally
{
reader.Close();
}
}
}