CustomDeserializers/AvroExample/EventhubCaptureCustomEventReader.cs (54 lines of code) (raw):

//********************************************************* // // Copyright (c) Microsoft. All rights reserved. // This code is licensed under the Microsoft Public License. // THIS CODE IS PROVIDED *AS IS* WITHOUT WARRANTY OF // ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING ANY // IMPLIED WARRANTIES OF FITNESS FOR A PARTICULAR // PURPOSE, MERCHANTABILITY, OR NON-INFRINGEMENT. // //********************************************************* using System; using System.Collections.Generic; using System.IO; using System.IO.Compression; using System.Linq; using ExampleCustomCode.Serialization; using Microsoft.Azure.StreamAnalytics; namespace ExampleCustomCode.AvroSerialization { // Reads eventhub capture in avro format and deserializes EventData.Body as a simple record. public sealed class EventhubCaptureCustomEventReader : EventhubCaptureReader<EventHubRecord> { // Replace ExampleDeserializer according to the actual data format. Alternatively, implement deserialization of EventData.Body inline in DeserializeEventData() function. private readonly CustomCsvDeserializer contentDeserializer = new CustomCsvDeserializer(); public override void Initialize(StreamingContext streamingContext) { this.contentDeserializer.Initialize(streamingContext); base.Initialize(streamingContext); } // this method shows an example of how EventData.Body can be deserialized into multiple records. protected override IEnumerable<EventHubRecord> DeserializeEventData(EventDataFromCapture eventData) { // assumes EventData.Body is a gzipped line separated records. using (var stream = new MemoryStream(eventData.Body)) { try { // deserialize body from a single eventdata completely. Skip eventData message body with invalid format. using (var unzippedStream = new GZipStream(stream, CompressionMode.Decompress)) { return this.contentDeserializer .Deserialize(unzippedStream) .Select( payload => new EventHubRecord() { Offset = eventData.Offset, Payload = payload }) .ToArray(); } } catch (Exception e) { var shortErrorMessage = $"Error deserializing eventhub message with offset: {eventData.Offset} SequenceNumber:{eventData.SequenceNumber}"; this.diagnostics.WriteError( shortErrorMessage, $"{shortErrorMessage} Exception: {e}"); return Enumerable.Empty<EventHubRecord>(); } } } } public class EventHubRecord { public string Offset { get; set; } public CustomEvent Payload { get; set; } } }