public static async Task Run()

in Source/Apps/Microsoft/Released/Microsoft-IoTContinuousDataExportTemplate/Service/Data/deviceTemplates/run.csx [21:154]


public static async Task Run(CloudBlockBlob myBlob, TraceWriter log)
{
    log.Info($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")} - Processing blob {myBlob.StorageUri}");

    await myBlob.FetchAttributesAsync();
    var timestamp = myBlob.Properties.LastModified.Value;

    var templates = new List<DeviceTemplate>();
    int parseFailCount = 0;

    using (var blobStream = new MemoryStream())
    {
        // Download blob content and save them to memory stream
        await myBlob.DownloadToStreamAsync(blobStream);
        blobStream.Position = 0;

        // Create Avro generic reader from memory stream
        using (var reader = AvroContainer.CreateGenericReader(blobStream))
        {
            // Loop through blocks within the container
            while (reader.MoveNext())
            {
                // Loop through Avro record inside the block and get all the fields
                foreach (AvroRecord record in reader.Current.Objects)
                {
                    try
                    {
                        var deviceTemplateId = record.GetField<string>("id");
                        var deviceTemplateName = record.GetField<string>("name");
                        var deviceTemplateVersion = record.GetField<string>("version");

                        var measurementsRecord = record.GetField<AvroRecord>("measurements");

                        var telemetryMap = measurementsRecord.GetField<IDictionary<string, dynamic>>("telemetry");
                        var telemetry = telemetryMap.ToDictionary(e => e.Key, e => ProcessingMeasurement(e.Value as AvroRecord));

                        var statesMap = measurementsRecord.GetField<IDictionary<string, dynamic>>("states");
                        var states = statesMap.ToDictionary(e => e.Key, e => ProcessingMeasurement(e.Value as AvroRecord));

                        var eventsMap = measurementsRecord.GetField<IDictionary<string, dynamic>>("events");
                        var events = eventsMap.ToDictionary(e => e.Key, e => ProcessingMeasurement(e.Value as AvroRecord));

                        var propertiesRecord = record.GetField<AvroRecord>("properties");

                        var cloudPropertiesMap = propertiesRecord.GetField<IDictionary<string, dynamic>>("cloud");
                        var cloudProperties = cloudPropertiesMap.ToDictionary(e => e.Key, e => ProcessingProperty(e.Value as AvroRecord));

                        var devicePropertiesMap = propertiesRecord.GetField<IDictionary<string, dynamic>>("device");
                        var deviceProperties = devicePropertiesMap.ToDictionary(e => e.Key, e => ProcessingProperty(e.Value as AvroRecord));

                        var settingsRecord = record.GetField<AvroRecord>("settings");
                        var deviceSettingsMap = settingsRecord.GetField<IDictionary<string, dynamic>>("device");
                        var deviceSettings = deviceSettingsMap.ToDictionary(e => e.Key, e => ProcessingProperty(e.Value as AvroRecord));

                        templates.Add(new DeviceTemplate()
                        {
                            TemplateId = deviceTemplateId,
                            TemplateName = deviceTemplateName,
                            TemplateVersion = deviceTemplateVersion,
                            Telemetry = telemetry,
                            States = states,
                            Events = events,
                            CloudProperties = cloudProperties,
                            DeviceProperties = deviceProperties,
                            DeviceSettings = deviceSettings
                        });
                    }
                    catch (Exception e)
                    {
                        log.Error($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")} - Failed to process Avro record");
                        log.Error($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")} - {e.ToString()}");
                        parseFailCount++;
                    }
                }
            }
        }
    }

    log.Info($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")} - Parsed {templates.Count} templates with {parseFailCount} failures");

    var templatesTable = CreateDeviceTemplatesTable();
    var measurementDefinitionsTable = CreateMeaurementDefinitionsTable();
    var propertyDefinitionsTable = CreatePropertyDefinitionsTable();

    foreach (var template in templates)
    {
        var templateRow = templatesTable.NewRow();
        templateRow["id"] = $"{template.TemplateId}/{template.TemplateVersion}";
        templateRow["deviceTemplateId"] = template.TemplateId;
        templateRow["deviceTemplateVersion"] = template.TemplateVersion;
        templateRow["name"] = template.TemplateName;
        templateRow["timestamp"] = timestamp.UtcDateTime;

        templatesTable.Rows.Add(templateRow);

        template.Telemetry.ToList().ForEach(entry => InsertMeasurementIntoTable(template, entry.Key, entry.Value, MeasurementKind.Telemetry, measurementDefinitionsTable, timestamp));
        template.States.ToList().ForEach(entry => InsertMeasurementIntoTable(template, entry.Key, entry.Value, MeasurementKind.State, measurementDefinitionsTable, timestamp));
        template.Events.ToList().ForEach(entry => InsertMeasurementIntoTable(template, entry.Key, entry.Value, MeasurementKind.Event, measurementDefinitionsTable, timestamp));

        template.CloudProperties.ToList().ForEach(entry => InsertPropertyIntoTable(template, entry.Key, entry.Value, PropertyKind.CloudProperty, propertyDefinitionsTable, timestamp));
        template.DeviceProperties.ToList().ForEach(entry => InsertPropertyIntoTable(template, entry.Key, entry.Value, PropertyKind.DeviceProperty, propertyDefinitionsTable, timestamp));
        template.DeviceSettings.ToList().ForEach(entry => InsertPropertyIntoTable(template, entry.Key, entry.Value, PropertyKind.DeviceSetting, propertyDefinitionsTable, timestamp));
    }

    var cs = ConfigurationManager.AppSettings["SQL_CONNECTIONSTRING"];
    using (SqlConnection conn = new SqlConnection(cs))
    {
        conn.Open();

        log.Info($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")} - Inserting into table: {templatesTable.TableName}");
        using (SqlCommand cmd = new SqlCommand("dbo.[InsertDeviceTemplates]", conn) { CommandType = CommandType.StoredProcedure })
        {
            cmd.Parameters.Add(new SqlParameter("@tableType", templatesTable));
            var rows = await cmd.ExecuteNonQueryAsync();
            log.Info($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")} - Added/Updated {rows} rows to the database table {templatesTable.TableName}");
        }

        log.Info($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")} - Inserting into table: {measurementDefinitionsTable.TableName}");
        using (SqlCommand cmd = new SqlCommand("dbo.[InsertMeasurementDefinitions]", conn) { CommandType = CommandType.StoredProcedure })
        {
            cmd.Parameters.Add(new SqlParameter("@tableType", measurementDefinitionsTable));
            var rows = await cmd.ExecuteNonQueryAsync();
            log.Info($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")} - Added/Updated {rows} rows to the database table {measurementDefinitionsTable.TableName}");
        }

        log.Info($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")} - Inserting into table: {propertyDefinitionsTable.TableName}");
        using (SqlCommand cmd = new SqlCommand("dbo.[InsertPropertyDefinitions]", conn) { CommandType = CommandType.StoredProcedure })
        {
            cmd.Parameters.Add(new SqlParameter("@tableType", propertyDefinitionsTable));
            var rows = await cmd.ExecuteNonQueryAsync();
            log.Info($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")} - Added/Updated {rows} rows to the database table {propertyDefinitionsTable.TableName}");
        }
    }
}