in CustomDeserializers/AvroExample/EventhubCaptureCustomEventReader.cs [37:67]
protected override IEnumerable<EventHubRecord> DeserializeEventData(EventDataFromCapture eventData)
{
// assumes EventData.Body is a gzipped line separated records.
using (var stream = new MemoryStream(eventData.Body))
{
try
{
// deserialize body from a single eventdata completely. Skip eventData message body with invalid format.
using (var unzippedStream = new GZipStream(stream, CompressionMode.Decompress))
{
return this.contentDeserializer
.Deserialize(unzippedStream)
.Select(
payload => new EventHubRecord()
{
Offset = eventData.Offset,
Payload = payload
})
.ToArray();
}
}
catch (Exception e)
{
var shortErrorMessage = $"Error deserializing eventhub message with offset: {eventData.Offset} SequenceNumber:{eventData.SequenceNumber}";
this.diagnostics.WriteError(
shortErrorMessage,
$"{shortErrorMessage} Exception: {e}");
return Enumerable.Empty<EventHubRecord>();
}
}
}