in Source/Apps/Microsoft/Released/Microsoft-IoTContinuousDataExportTemplate/Service/Data/deviceTemplates/run.csx [21:154]
public static async Task Run(CloudBlockBlob myBlob, TraceWriter log)
{
log.Info($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")} - Processing blob {myBlob.StorageUri}");
await myBlob.FetchAttributesAsync();
var timestamp = myBlob.Properties.LastModified.Value;
var templates = new List<DeviceTemplate>();
int parseFailCount = 0;
using (var blobStream = new MemoryStream())
{
// Download blob content and save them to memory stream
await myBlob.DownloadToStreamAsync(blobStream);
blobStream.Position = 0;
// Create Avro generic reader from memory stream
using (var reader = AvroContainer.CreateGenericReader(blobStream))
{
// Loop through blocks within the container
while (reader.MoveNext())
{
// Loop through Avro record inside the block and get all the fields
foreach (AvroRecord record in reader.Current.Objects)
{
try
{
var deviceTemplateId = record.GetField<string>("id");
var deviceTemplateName = record.GetField<string>("name");
var deviceTemplateVersion = record.GetField<string>("version");
var measurementsRecord = record.GetField<AvroRecord>("measurements");
var telemetryMap = measurementsRecord.GetField<IDictionary<string, dynamic>>("telemetry");
var telemetry = telemetryMap.ToDictionary(e => e.Key, e => ProcessingMeasurement(e.Value as AvroRecord));
var statesMap = measurementsRecord.GetField<IDictionary<string, dynamic>>("states");
var states = statesMap.ToDictionary(e => e.Key, e => ProcessingMeasurement(e.Value as AvroRecord));
var eventsMap = measurementsRecord.GetField<IDictionary<string, dynamic>>("events");
var events = eventsMap.ToDictionary(e => e.Key, e => ProcessingMeasurement(e.Value as AvroRecord));
var propertiesRecord = record.GetField<AvroRecord>("properties");
var cloudPropertiesMap = propertiesRecord.GetField<IDictionary<string, dynamic>>("cloud");
var cloudProperties = cloudPropertiesMap.ToDictionary(e => e.Key, e => ProcessingProperty(e.Value as AvroRecord));
var devicePropertiesMap = propertiesRecord.GetField<IDictionary<string, dynamic>>("device");
var deviceProperties = devicePropertiesMap.ToDictionary(e => e.Key, e => ProcessingProperty(e.Value as AvroRecord));
var settingsRecord = record.GetField<AvroRecord>("settings");
var deviceSettingsMap = settingsRecord.GetField<IDictionary<string, dynamic>>("device");
var deviceSettings = deviceSettingsMap.ToDictionary(e => e.Key, e => ProcessingProperty(e.Value as AvroRecord));
templates.Add(new DeviceTemplate()
{
TemplateId = deviceTemplateId,
TemplateName = deviceTemplateName,
TemplateVersion = deviceTemplateVersion,
Telemetry = telemetry,
States = states,
Events = events,
CloudProperties = cloudProperties,
DeviceProperties = deviceProperties,
DeviceSettings = deviceSettings
});
}
catch (Exception e)
{
log.Error($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")} - Failed to process Avro record");
log.Error($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")} - {e.ToString()}");
parseFailCount++;
}
}
}
}
}
log.Info($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")} - Parsed {templates.Count} templates with {parseFailCount} failures");
var templatesTable = CreateDeviceTemplatesTable();
var measurementDefinitionsTable = CreateMeaurementDefinitionsTable();
var propertyDefinitionsTable = CreatePropertyDefinitionsTable();
foreach (var template in templates)
{
var templateRow = templatesTable.NewRow();
templateRow["id"] = $"{template.TemplateId}/{template.TemplateVersion}";
templateRow["deviceTemplateId"] = template.TemplateId;
templateRow["deviceTemplateVersion"] = template.TemplateVersion;
templateRow["name"] = template.TemplateName;
templateRow["timestamp"] = timestamp.UtcDateTime;
templatesTable.Rows.Add(templateRow);
template.Telemetry.ToList().ForEach(entry => InsertMeasurementIntoTable(template, entry.Key, entry.Value, MeasurementKind.Telemetry, measurementDefinitionsTable, timestamp));
template.States.ToList().ForEach(entry => InsertMeasurementIntoTable(template, entry.Key, entry.Value, MeasurementKind.State, measurementDefinitionsTable, timestamp));
template.Events.ToList().ForEach(entry => InsertMeasurementIntoTable(template, entry.Key, entry.Value, MeasurementKind.Event, measurementDefinitionsTable, timestamp));
template.CloudProperties.ToList().ForEach(entry => InsertPropertyIntoTable(template, entry.Key, entry.Value, PropertyKind.CloudProperty, propertyDefinitionsTable, timestamp));
template.DeviceProperties.ToList().ForEach(entry => InsertPropertyIntoTable(template, entry.Key, entry.Value, PropertyKind.DeviceProperty, propertyDefinitionsTable, timestamp));
template.DeviceSettings.ToList().ForEach(entry => InsertPropertyIntoTable(template, entry.Key, entry.Value, PropertyKind.DeviceSetting, propertyDefinitionsTable, timestamp));
}
var cs = ConfigurationManager.AppSettings["SQL_CONNECTIONSTRING"];
using (SqlConnection conn = new SqlConnection(cs))
{
conn.Open();
log.Info($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")} - Inserting into table: {templatesTable.TableName}");
using (SqlCommand cmd = new SqlCommand("dbo.[InsertDeviceTemplates]", conn) { CommandType = CommandType.StoredProcedure })
{
cmd.Parameters.Add(new SqlParameter("@tableType", templatesTable));
var rows = await cmd.ExecuteNonQueryAsync();
log.Info($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")} - Added/Updated {rows} rows to the database table {templatesTable.TableName}");
}
log.Info($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")} - Inserting into table: {measurementDefinitionsTable.TableName}");
using (SqlCommand cmd = new SqlCommand("dbo.[InsertMeasurementDefinitions]", conn) { CommandType = CommandType.StoredProcedure })
{
cmd.Parameters.Add(new SqlParameter("@tableType", measurementDefinitionsTable));
var rows = await cmd.ExecuteNonQueryAsync();
log.Info($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")} - Added/Updated {rows} rows to the database table {measurementDefinitionsTable.TableName}");
}
log.Info($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")} - Inserting into table: {propertyDefinitionsTable.TableName}");
using (SqlCommand cmd = new SqlCommand("dbo.[InsertPropertyDefinitions]", conn) { CommandType = CommandType.StoredProcedure })
{
cmd.Parameters.Add(new SqlParameter("@tableType", propertyDefinitionsTable));
var rows = await cmd.ExecuteNonQueryAsync();
log.Info($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")} - Added/Updated {rows} rows to the database table {propertyDefinitionsTable.TableName}");
}
}
}