CustomDeserializers/ExampleDeserializer.cs (49 lines of code) (raw):
//*********************************************************
//
// Copyright (c) Microsoft. All rights reserved.
// This code is licensed under the Microsoft Public License.
// THIS CODE IS PROVIDED *AS IS* WITHOUT WARRANTY OF
// ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING ANY
// IMPLIED WARRANTIES OF FITNESS FOR A PARTICULAR
// PURPOSE, MERCHANTABILITY, OR NON-INFRINGEMENT.
//
//*********************************************************
using System.Collections.Generic;
using System.IO;
using Microsoft.Azure.StreamAnalytics;
using Microsoft.Azure.StreamAnalytics.Serialization;
namespace ExampleCustomCode.Serialization
{
// Deserializes a stream into objects of type CustomEvent.
// It reads the Stream line by line and assumes each line has three columns separated by ",".
// Writes an error to diagnostics and skips the line otherwise.
public class CustomCsvDeserializer : StreamDeserializer<CustomEvent>
{
// streamingDiagnostics is used to write error to diagnostic logs
private StreamingDiagnostics streamingDiagnostics;
// Initializes the operator and provides context that is required for publishing diagnostics
public override void Initialize(StreamingContext streamingContext)
{
this.streamingDiagnostics = streamingContext.Diagnostics;
}
// Deserializes a stream into objects of type CustomEvent
public override IEnumerable<CustomEvent> Deserialize(Stream stream)
{
using (var sr = new StreamReader(stream))
{
string line = sr.ReadLine();
while (line != null)
{
if (line.Length > 0 && !string.IsNullOrWhiteSpace(line))
{
string[] parts = line.Split(',');
if (parts.Length != 3)
{
//if there are not 3 columns in the input, write error to diagnostic log, skip the line and continue deserializing rest of the stream.
streamingDiagnostics.WriteError("Did not get expected number of columns", $"Invalid line: {line}");
}
else
{
// create a new CustomEvent object with 3 values
yield return new CustomEvent()
{
Column1 = parts[0],
Column2 = parts[1],
Column3 = parts[2]
};
}
}
line = sr.ReadLine();
}
}
}
}
/*
The CustomEvent class follows the rules mentioned below.
All public fields are either:
1. One of [long, DateTime, string, double] or their nullableequivalents
2. Another struct or class following the same rules
3. Array of type <T2> that follows the same rules
4. IList`T2` where T2 follows the same rules
5. Does not have any recursive types.
*/
public class CustomEvent
{
public string Column1 { get; set; }
public string Column2 { get; set; }
public string Column3 { get; set; }
}
}