CustomDeserializers/AvroExample/EventhubCaptureReader.cs (73 lines of code) (raw):

//********************************************************* // // Copyright (c) Microsoft. All rights reserved. // This code is licensed under the Microsoft Public License. // THIS CODE IS PROVIDED *AS IS* WITHOUT WARRANTY OF // ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING ANY // IMPLIED WARRANTIES OF FITNESS FOR A PARTICULAR // PURPOSE, MERCHANTABILITY, OR NON-INFRINGEMENT. // //********************************************************* using System; using System.Collections.Generic; using System.IO; using Avro.Generic; using Avro.IO; using Avro.File; using Microsoft.Azure.StreamAnalytics; using Microsoft.Azure.StreamAnalytics.Serialization; namespace ExampleCustomCode.AvroSerialization { // Reads eventhub capture format. https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-capture-overview public abstract class EventhubCaptureReader<T> : StreamDeserializer<T> { protected StreamingDiagnostics diagnostics; public override void Initialize(StreamingContext streamingContext) { this.diagnostics = streamingContext.Diagnostics; } public override IEnumerable<T> Deserialize(Stream stream) { IFileReader<GenericRecord> reader = null; try { reader = DataFileReader<GenericRecord>.OpenReader(stream); } catch(Exception e) { this.diagnostics.WriteError( briefMessage: "Unable to open stream as avro. Please check if the stream is from eventhub capture. https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-capture-overview ", detailedMessage: e.Message); throw; } foreach(GenericRecord genericRecord in reader.NextEntries) { EventDataFromCapture eventData = this.ConvertToEventDataFromCapture(genericRecord); // deserialize records from eventdata body. foreach (T record in this.DeserializeEventData(eventData)) { yield return record; } } reader.Dispose(); } protected abstract IEnumerable<T> DeserializeEventData(EventDataFromCapture eventData); private EventDataFromCapture ConvertToEventDataFromCapture(GenericRecord genericRecord) { try { var eventData = new EventDataFromCapture() { EnqueuedTimeUtc = (string)genericRecord["EnqueuedTimeUtc"], Offset = (string)genericRecord["Offset"], SequenceNumber = (long)genericRecord["SequenceNumber"], Body = (byte[])genericRecord["Body"], }; return eventData; } catch(Exception e) { this.diagnostics.WriteError( briefMessage: $"Unable to get fields required to create captured event data. Error: {e.Message}", detailedMessage: e.ToString()); throw; } } } public class EventDataFromCapture { public long SequenceNumber { get; set; } public string Offset { get; set; } public string EnqueuedTimeUtc { get; set; } public byte[] Body { get; set; } } }