public static void Run()

in DataConnectors/AzureStorage/GetAzureStorageLogsFunction.cs [121:424]


        public static void Run([TimerTrigger("0 * * * * *")]TimerInfo myTimer, ILogger log)
        {
            try
            {
                const string Format = @"<version-number>;<request-start-time>;<operation-type>;<request-status>;<http-status-code>;<end-to-end-latency-in-ms>;<server-latency-in-ms>;<authentication-type>;<requester-account-name>;<owner-account-name>;<service-type>;<request-url>;<requested-object-key>;<request-id-header>;<operation-count>;<requester-ip-address>;<request-version-header>;<request-header-size>;<request-packet-size>;<response-header-size>;<response-packet-size>;<request-content-length>;<request-md5>;<server-md5>;<etag-identifier>;<last-modified-time>;<conditions-used>;<user-agent-header>;<referrer-header>;<client-request-id>";
                const string ConnectionStringFormat = "DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1};EndpointSuffix=core.windows.net";
                const string container = "statedata";
                const string Blob = "state";
                const string FormatString = "yyyy-MM-ddTHH:mm:ss.fffffffZ";
                const string LogAnalyticsTableName = "HoneyBucketLogs";

                log.LogInformation($"C# Timer trigger function executed at: {DateTime.Now}");

                var blobStorageConnectionString = Environment.GetEnvironmentVariable("BlobStorageConnectionString");
                var blobStorageKeys = Environment.GetEnvironmentVariable("BlobStorageAccountKeys");
                var logAnalyticsKey = Environment.GetEnvironmentVariable("LogAnalyticsKey");
                var logAnalyticsWorkspace = Environment.GetEnvironmentVariable("LogAnalyticsWorkspace");

                log.LogInformation("BlobStorageConnectionString " + blobStorageConnectionString);
                log.LogInformation("BlobStorageAccountKeys " + blobStorageKeys);
                log.LogInformation("LogAnalyticsKey " + logAnalyticsKey);
                log.LogInformation("LogAnalyticsWorkspace " + logAnalyticsWorkspace);

                foreach (var setting in new string[] { logAnalyticsWorkspace, logAnalyticsKey, blobStorageKeys, blobStorageConnectionString })
                {
                    if (string.IsNullOrWhiteSpace(setting) || setting.StartsWith("http") || setting.StartsWith("@Microsoft.KeyVault"))
                    {
                        log.LogError("Invalid setting detected " + setting);
                        log.LogError("Please see https://docs.microsoft.com/en-us/azure/app-service/app-service-key-vault-references");
                        throw new InvalidOperationException("Invalid setting");
                    }
                }

                // according to the docs here: https://docs.microsoft.com/en-us/azure/storage/common/storage-analytics-logging
                // it can take up to an hour for logs to hit blob storage. As such need to keep a list of timestamps of the last
                // entry we've seen
                var lastLogEntryProcessedTime = new ConcurrentDictionary<string, DateTime>();

                try
                {
                    var lastReadTimeStr = GetData(blobStorageConnectionString, container, Blob);
                    log.LogInformation("Read time as " + lastReadTimeStr);

                    lastReadTimeStr.Split(";").ToList().ForEach(x =>
                    {
                        var split = x.Split('='); lastLogEntryProcessedTime.TryAdd(split[0], DateTime.ParseExact(split[1], FormatString, null));
                    });
                }
                catch (Exception ex)
                {
                    log.LogError(ex, "Could not get or parse state, did you forget to create the state file? "+ex.GetType().Name);
                }

                var fields = Format.Split(';');
                var fieldList = fields.Select(x => x.Substring(1, x.Length - 2)).ToList();

                var config = new CsvConfiguration(CultureInfo.InvariantCulture)
                {
                    Delimiter = ";",
                    Escape = '"',
                    IgnoreQuotes = false,
                    HasHeaderRecord = false,
                };

                // build a list of storage accounts to pull the diagnostic logs from
                var connections = blobStorageKeys
                    .Split(";")
                    .ToList()
                    .ConvertAll(x =>
                    {
                        var unameKeyStr = x.Split(':');
                        var kvp = new KeyValuePair<string, string>(unameKeyStr[0], unameKeyStr[1]);
                        if (!lastLogEntryProcessedTime.ContainsKey(kvp.Key))
                        {
                            lastLogEntryProcessedTime.TryAdd(kvp.Key, DateTime.UtcNow.AddDays(-2));
                        }
                        return kvp;
                    });

                log.LogInformation($"Getting streams for " + connections.Count + " connections");

                var whiteListedIps = new ConcurrentDictionary<string, byte>();
                var honeybucketrawlogs = new ConcurrentBag<LogMessage>();
                var rawLogMessages = new ConcurrentBag<Tuple<string, string>>();

                // Here we create two tasks
                // * getLogsTask - Get logs from blob storage
                // * processLogsTask - Convert logs to CSV entries and process

                var getLogsTask = Task.Run(() =>
                {
                    Parallel.ForEach(connections, (connection) =>
                    {
                    // By storing some state we can massively reduce the amount of log processing we do, here
                    // we only retrieve logs that are recent
                    var from = lastLogEntryProcessedTime[connection.Key].AddHours(-1);
                        var to = DateTime.UtcNow.AddHours(1);

                        var blobs = LogDownloader.DownloadStorageLogs(
                            string.Format(ConnectionStringFormat, connection.Key, connection.Value),
                            "blob",
                            from, // don't want to miss any logs
                            to);

                        log.LogInformation(string.Format("Getting {0} log from {1} to {2}", connection.Key, from, to));

                        foreach (var blob in blobs)
                        {
                        // There is no point downloading more logs if the output queue is still full of data to process
                        // we need to back off here and wait for the queue to reduce.
                        while (rawLogMessages.Count >= 1000)
                            {
                                var rand = new Random();
                            // back off a random time in ms.
                            Task.Delay(rand.Next(0, 1000)).Wait();
                            }

                            try
                            {
                                using (var reader = new StreamReader(blob.OpenRead())) // underlying stream is closed by StreamReader
                            {
                                    var text = reader.ReadToEnd();
                                    rawLogMessages.Add(new Tuple<string, string>(connection.Key, text));
                                }
                            }
                            catch (Exception ex)
                            {
                                log.LogError(ex, "Error in getLogsTask: " + ex.Message);
                            }
                        }

                        log.LogInformation(string.Format("Finished getting {0} log from {1} to {2}", connection.Key, from, to));
                    });
                });

                var processLogsTask = Task.Run(() =>
                {
                    while (!getLogsTask.IsCompleted)
                    {
                        while (rawLogMessages.TryTake(out Tuple<string, string> value))
                        {
                            using (var sr = new StringReader(value.Item2))
                            using (var csv = new CsvReader(sr, config))
                            {
                                while (csv.Read())
                                {
                                    var logentry = new List<KeyValuePair<string, string>>();
                                    for (int c = 0; c < fieldList.Count(); c++)
                                    {
                                        logentry.Add(new KeyValuePair<string, string>(fieldList[c], csv.GetField(c)));
                                    }

                                //request-start-time
                                var requestTime = logentry.Where(x => x.Key == "request-start-time").First().Value;
                                    var requestDateTime = DateTime.ParseExact(requestTime, FormatString, null);

                                // first check to ensure we are not processing any old log entries
                                // if we are we can bail out here
                                if (requestDateTime <= lastLogEntryProcessedTime[value.Item1])
                                    {
                                        return;
                                    }

                                    lastLogEntryProcessedTime.AddOrUpdate(
                                        value.Item1,
                                        requestDateTime,
                                        (key, oldValue) =>
                                        {
                                            if (requestDateTime > oldValue)
                                            {
                                                return requestDateTime;
                                            }
                                            else
                                            {
                                                return requestDateTime;
                                            }
                                        }
                                    );

                                // now we have a full logentry to process
                                var operationType = logentry.Where(x => x.Key == "operation-type").First().Value;

                                    switch (operationType)
                                    {
                                        case "PutBlob":
                                            var ip = logentry.Where(x => x.Key == "requester-ip-address").Select(kvp => kvp.Value.Substring(0, kvp.Value.IndexOf(':'))).First();
                                            whiteListedIps.TryAdd(ip, 0);
                                            break;
                                        case "ListBlobs":
                                        case "ListContainers":
                                        case "GetBlob":
                                        case "GetBlobProperties":
                                        case "GetContainerProperties":
                                        case "GetContainerACL":
                                            var msg = new LogMessage()
                                            {
                                                RequestTime = requestTime,
                                                URL = logentry.Where(x => x.Key == "request-url").First().Value,
                                                OriginIP = logentry.Where(x => x.Key == "requester-ip-address").Select(kvp => kvp.Value.Substring(0, kvp.Value.IndexOf(':'))).First(),
                                                RequestType = logentry.Where(x => x.Key == "operation-type").First().Value,
                                                UserAgent = logentry.Where(x => x.Key == "user-agent-header").First().Value
                                            };

                                            if (whiteListedIps.ContainsKey(msg.OriginIP))
                                            {
                                            // already whitelisted
                                            break;
                                            }

                                        //authentication-type
                                        var authtype = logentry.Where(x => x.Key == "authentication-type").First().Value;

                                        // Microsoft Azure Storage Explorer actively enumerates all the files in the
                                        // bucket & generates a lot of log entries. We want to ignore this because if someone has my sub in their
                                        // list and opens up this tool then I get a load of FPs
                                        if (msg.UserAgent.Contains("Microsoft Azure Storage Explorer") && authtype == "authenticated")
                                            {
                                                whiteListedIps.TryAdd(msg.OriginIP, 0);
                                            }
                                        // ignore any queries for $log or with null user agents. This is internal
                                        else if (msg.URL.Contains("$log") || string.IsNullOrWhiteSpace(msg.UserAgent))
                                            {
                                                whiteListedIps.TryAdd(msg.OriginIP, 0);
                                            }
                                            else
                                            {
                                                honeybucketrawlogs.Add(msg);
                                            }
                                            break;
                                        default:
                                            break;
                                    }
                                }
                            }
                        }
                    }
                });

                // Runs the tasks until they complete and add the results to Azure Sentinel
                using (var exitEvent = new ManualResetEvent(false))
                {
                    var processResultsTask = Task.Run(() =>
                    {
                        while (!processLogsTask.IsCompleted && honeybucketrawlogs.Count >= 0)
                        {
                            exitEvent.WaitOne(30 * 1000);

                            List<LogMessage> messages = new List<LogMessage>();
                            while (honeybucketrawlogs.TryTake(out LogMessage message))
                            {
                                messages.Add(message);
                            }

                            var listOfIps = messages.Where(x => !whiteListedIps.ContainsKey(x.OriginIP)).Distinct().ToList();

                            if (listOfIps.Count() != 0)
                            {
                                listOfIps.Select(x => x.OriginIP).Distinct().ToList().ForEach(x => log.LogInformation($"Found: " + x));

                                log.LogInformation($"Sending to LogAnalytics");

                                var collector = new HTTPDataCollectorAPI.Collector(
                                    logAnalyticsWorkspace,
                                    logAnalyticsKey);

                                try
                                {
                                    var task = collector.Collect(LogAnalyticsTableName, listOfIps);
                                    task.Wait();
                                }
                                catch (Exception ex)
                                {
                                    log.LogError($"Couldn't send to LA", ex);
                                    return;
                                }
                            }
                        }
                    });

                    getLogsTask.Wait();
                    processLogsTask.Wait();
                    exitEvent.Set();
                    processResultsTask.Wait();
                }

                log.LogInformation($"Finished parsing logs");

                // Create a new state entry so we can be sure we only process the minimum amount of data
                var lastReadStr = string.Join(";", lastLogEntryProcessedTime.ToList().ConvertAll(x => x.Key + "=" + x.Value.ToString(FormatString)));

                if (string.IsNullOrWhiteSpace(lastReadStr))
                {
                    log.LogError($"The state string is bad! skipping");
                }
                else
                {
                    WriteData(blobStorageConnectionString, container, Blob, lastReadStr);
                }
            }
            catch (Exception ex)
            {
                log.LogError(ex, "An unexpected exception occured");
            }
        }