public static async Task Run()

in Source/Apps/Microsoft/Released/Microsoft-IoTContinuousDataExportTemplate/Service/Data/measurements/run.csx [24:203]


public static async Task Run(CloudBlockBlob myBlob, TraceWriter log, ExecutionContext context)
{
    log.Info($"{GetLogPrefix(context)} - Processing blob {myBlob.StorageUri}");

    await myBlob.FetchAttributesAsync();
    var timestamp = myBlob.Properties.LastModified.Value;
    int.TryParse(ConfigurationManager.AppSettings["SQL_CONNECTIONSTRING"], out int historyDataHours);
    if (historyDataHours > 0 && DateTime.UtcNow.Subtract(timestamp.UtcDateTime) > TimeSpan.FromHours(historyDataHours))
    {
        log.Info($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")} - Blob timestamp: {timestamp} older than {historyDataHours} hour, ignored");
        return;
    }

    var currentCount = System.Threading.Interlocked.Increment(ref counter);
    log.Info($"{GetLogPrefix(context)} - Concurrent job count: {currentCount}");

    IList<Message> messages = new List<Message>();
    int parseFailCount = 0;

    using (var blobStream = new MemoryStream())
    {
        var stopWatch = System.Diagnostics.Stopwatch.StartNew();
        // Download blob content and save them to memory stream
        await myBlob.DownloadToStreamAsync(blobStream);
        stopWatch.Stop();
        blobStream.Position = 0;
        log.Info($"{GetLogPrefix(context)} - Downloaded blob content. Length: {blobStream.Length}. Elapsed: {stopWatch.Elapsed}");

        // Create Avro reader from memory stream
        using (var reader = AvroContainer.CreateGenericReader(blobStream))
        {
            // Loop through blocks within the container
            while (reader.MoveNext())
            {
                // Loop through Avro record inside the block and get all the fields
                foreach (AvroRecord record in reader.Current.Objects)
                {
                    try
                    {
                        var messageId = Guid.NewGuid();
                        var systemProperties = record.GetField<IDictionary<string, object>>("SystemProperties");
                        var deviceId = systemProperties["connectionDeviceId"] as string;
                        var enqueueTime = DateTime.Parse(record.GetField<string>("EnqueuedTimeUtc"));

                        using (var stream = new MemoryStream(record.GetField<byte[]>("Body")))
                        {
                            using (var streamReader = new StreamReader(stream, Encoding.UTF8))
                            {
                                try
                                {
                                    var body = JsonSerializer.Create().Deserialize(streamReader, typeof(IDictionary<string, dynamic>)) as IDictionary<string, dynamic>;
                                    messages.Add(new Message
                                    {
                                        messageId = messageId,
                                        timestamp = enqueueTime,
                                        deviceId = deviceId,
                                        values = body,
                                        messageSize = (int)stream.Length
                                    });
                                }
                                catch (Exception e)
                                {
                                    log.Error($"{GetLogPrefix(context)} - Failed to process the body for device {deviceId}");
                                    log.Error($"{GetLogPrefix(context)} - {e.ToString()}");
                                    parseFailCount++;
                                }
                            }
                        }
                    }
                    catch (Exception e)
                    {
                        log.Error($"{GetLogPrefix(context)} - Failed to process Avro record");
                        log.Error($"{GetLogPrefix(context)} - {e.ToString()}");
                        parseFailCount++;
                    }
                }
            }
        }
    }

    log.Info($"{GetLogPrefix(context)} - Parsed {messages.Count} messages with {parseFailCount} failures");

    var measurementsTable = CreateMeasurementsTable();
    var messagesTable = CreateMessagesTable();
    foreach (var message in messages)
    {
        var messageRow = messagesTable.NewRow();
        messageRow["id"] = message.messageId;
        messageRow["deviceId"] = message.deviceId;
        messageRow["timestamp"] = message.timestamp;
        messageRow["size"] = message.messageSize;
        messagesTable.Rows.Add(messageRow);

        foreach (KeyValuePair<string, dynamic> entry in message.values)
        {
            var row = measurementsTable.NewRow();
            row["messageId"] = message.messageId;
            row["deviceId"] = message.deviceId;
            row["timestamp"] = message.timestamp;
            row["field"] = entry.Key;

            switch (entry.Value)
            {
                case bool _:
                    row["booleanValue"] = bool.Parse(entry.Value.ToString());
                    break;

                case int _:
                case Int64 _:
                case double _:
                case float _:
                    row["numericValue"] = float.Parse(entry.Value.ToString());
                    break;

                case null:
                    break;

                default:
                    row["stringValue"] = entry.Value.ToString();
                    break;
            }

            measurementsTable.Rows.Add(row);
        }
    }


    var cs = ConfigurationManager.AppSettings["SQL_CONNECTIONSTRING"];
    using (SqlConnection conn = new SqlConnection(cs))
    {
        conn.Open();

        using (SqlBulkCopy bulkCopy = new SqlBulkCopy(conn) { BulkCopyTimeout = 120 })
        {
            log.Info($"{GetLogPrefix(context)} - Inserting into table: {messagesTable.TableName}");
            bulkCopy.DestinationTableName = "analytics.Messages";
            var stopWatch = System.Diagnostics.Stopwatch.StartNew();
            try
            {
                await bulkCopy.WriteToServerAsync(messagesTable);
                stopWatch.Stop();
                log.Info($"{GetLogPrefix(context)} - Added {messagesTable.Rows.Count} rows to the database table {messagesTable.TableName}. Elapsed: {stopWatch.Elapsed}");
            }
            catch (Exception exception)
            {
                stopWatch.Stop();
                log.Error($"{GetLogPrefix(context)} - Elapsed: {stopWatch.Elapsed} - database table {messagesTable.TableName}", exception);
                System.Threading.Interlocked.Decrement(ref counter);
                throw;
            }
        }

        using (SqlBulkCopy bulkCopy = new SqlBulkCopy(conn) { BulkCopyTimeout = 120 })
        {
            foreach (DataColumn column in measurementsTable.Columns)
            {
                bulkCopy.ColumnMappings.Add(column.ColumnName, column.ColumnName);
            }

            log.Info($"{GetLogPrefix(context)} - Inserting into table: {measurementsTable.TableName}");
            bulkCopy.DestinationTableName = "stage.Measurements";
            var stopWatch = System.Diagnostics.Stopwatch.StartNew();
            try
            {
                await bulkCopy.WriteToServerAsync(measurementsTable);
                stopWatch.Stop();
                log.Info($"{GetLogPrefix(context)} - Added {measurementsTable.Rows.Count} rows to the database table {measurementsTable.TableName}. Elapsed: {stopWatch.Elapsed}");
            }
            catch (Exception exception)
            {
                stopWatch.Stop();
                log.Error($"{GetLogPrefix(context)} - Elapsed: {stopWatch.Elapsed} - database table {measurementsTable.TableName}", exception);
                System.Threading.Interlocked.Decrement(ref counter);
                throw;
            }
        }

        System.Threading.Interlocked.Decrement(ref counter);
    }
}