public static async Task Run()

in Source/Apps/Microsoft/Released/Microsoft-IoTContinuousDataExportTemplate/Service/Data/devices/run.csx [23:163]


public static async Task Run(CloudBlockBlob myBlob, TraceWriter log)
{
    log.Info($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")} - Processing blob {myBlob.StorageUri}");

    await myBlob.FetchAttributesAsync();
    var timestamp = myBlob.Properties.LastModified.Value;

    var devices = new List<Device>();
    int parseFailCount = 0;

    using (var blobStream = new MemoryStream())
    {
        // Download blob content and save them to memory stream
        await myBlob.DownloadToStreamAsync(blobStream);
        blobStream.Position = 0;

        // Create generic Avro reader from memory stream
        using (var reader = AvroContainer.CreateGenericReader(blobStream))
        {
            // For one Avro Container, it may contains multiple blocks
            // Loop through each block within the container
            while (reader.MoveNext())
            {
                // Loop through Avro record inside the block and extract the fields
                foreach (AvroRecord record in reader.Current.Objects)
                {
                    try
                    {
                        var fields = record.Schema.Fields;
                        var deviceId = record.GetField<string>("id");

                        var connectionDeviceId = deviceId;
                        if (fields.Any(field => field.Name == "deviceId"))
                        {
                            connectionDeviceId = record.GetField<string>("deviceId");
                        }

                        var deviceName = record.GetField<string>("name");
                        var simulated = record.GetField<bool>("simulated");

                        var deviceTemplateRecord = record.GetField<AvroRecord>("deviceTemplate");
                        var templateId = deviceTemplateRecord.GetField<string>("id");
                        var templateVersion = deviceTemplateRecord.GetField<string>("version");

                        var propertiesRecord = record.GetField<AvroRecord>("properties");
                        var cloudProperties = propertiesRecord.GetField<IDictionary<string, dynamic>>("cloud");
                        var deviceProperties = propertiesRecord.GetField<IDictionary<string, dynamic>>("device");

                        var settingsRecord = record.GetField<AvroRecord>("settings");
                        var deviceSettings = settingsRecord.GetField<IDictionary<string, dynamic>>("device");

                        devices.Add(new Device()
                        {
                            DeviceId = deviceId,
                            ConnectionDeviceId = connectionDeviceId,
                            DeviceName = deviceName,
                            Simulated = simulated,
                            DeviceTemplateId = templateId,
                            DeviceTemplateVersion = templateVersion,
                            CloudProperties = cloudProperties,
                            DeviceProperties = deviceProperties,
                            DeviceSettings = deviceSettings
                        });
                    }
                    catch (Exception e)
                    {
                        log.Error($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")} - Failed to process Avro record");
                        log.Error($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")} - {e.ToString()}");
                        parseFailCount++;
                    }
                }
            }
        }
    }

    log.Info($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")} - Parsed {devices.Count} devices with {parseFailCount} failures");

    var devicesTable = CreateDevicesTable();
    var propertiesTable = CreatePropertiesTable();

    foreach (var device in devices)
    {
        var deviceRow = devicesTable.NewRow();
        deviceRow["deviceId"] = device.DeviceId;
        deviceRow["connectionDeviceId"] = device.ConnectionDeviceId;
        deviceRow["deviceTemplate"] = $"{device.DeviceTemplateId}/{device.DeviceTemplateVersion}";
        deviceRow["name"] = device.DeviceName;
        deviceRow["simulated"] = device.Simulated;
        deviceRow["timestamp"] = timestamp.UtcDateTime;

        devicesTable.Rows.Add(deviceRow);

        device.CloudProperties.ToList().ForEach(entry => ProcessingProperty(device, entry, PropertyKind.CloudProperty, propertiesTable, timestamp));
        device.DeviceProperties.ToList().ForEach(entry => ProcessingProperty(device, entry, PropertyKind.DeviceProperty, propertiesTable, timestamp));
        device.DeviceSettings.ToList().ForEach(entry => ProcessingProperty(device, entry, PropertyKind.DeviceSetting, propertiesTable, timestamp));
    }

    var cs = ConfigurationManager.AppSettings["SQL_CONNECTIONSTRING"];
    using (SqlConnection conn = new SqlConnection(cs))
    {
        conn.Open();

        log.Info($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")} - Inserting into table: {devicesTable.TableName}");
        using (SqlCommand cmd = new SqlCommand("dbo.[InsertDevices]", conn) { CommandType = CommandType.StoredProcedure })
        {
            cmd.Parameters.Add(new SqlParameter("@tableType", devicesTable));
            var stopWatch = System.Diagnostics.Stopwatch.StartNew();
            try
            {
                var rows = await cmd.ExecuteNonQueryAsync();
                stopWatch.Stop();
                log.Info($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")} - Added/Updated {rows} rows to the database. Elapsed: {stopWatch.Elapsed}");
            }
            catch (Exception exception)
            {
                stopWatch.Stop();
                log.Error($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")} - Elapsed: {stopWatch.Elapsed}", exception);
                throw;
            }
        }

        log.Info($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")} - Inserting into table: {propertiesTable.TableName}");
        using (SqlCommand cmd = new SqlCommand("dbo.[InsertProperties]", conn) { CommandType = CommandType.StoredProcedure })
        {
            cmd.Parameters.Add(new SqlParameter("@tableType", propertiesTable));
            var stopWatch = System.Diagnostics.Stopwatch.StartNew();
            try
            {
                var rows = await cmd.ExecuteNonQueryAsync();
                stopWatch.Stop();
                log.Info($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")} - Added/Updated {rows} rows to the database. Elapsed: {stopWatch.Elapsed}");
            }
            catch (Exception exception)
            {
                stopWatch.Stop();
                log.Error($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")} - Elapsed: {stopWatch.Elapsed}", exception);
                throw;
            }
        }
    }
}