Source/NuGetGallery.Operations/Tasks/CreateWarehouseReportsTask.cs (417 lines of code) (raw):

using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Data; using System.Data.SqlClient; using System.IO; using System.Threading; using System.Threading.Tasks; using Microsoft.WindowsAzure.Storage; using Microsoft.WindowsAzure.Storage.Blob; using Newtonsoft.Json.Linq; using NuGetGallery.Operations.Common; namespace NuGetGallery.Operations { [Command("createwarehousereports", "Create warehouse reports", AltName = "cwrep")] public class CreateWarehouseReportsTask : DatabaseAndStorageTask { private const string JsonContentType = "application/json"; private const string PackageReportBaseName = "recentpopularity_"; private const string NuGetClientVersion = "nugetclientversion"; private const string Last6Months = "last6months"; private const string RecentPopularity = "recentpopularity"; private const string RecentPopularityDetail = "recentpopularitydetail"; private const string PackageReportDetailBaseName = "recentpopularitydetail_"; [Option("Re-create all reports", AltName = "all")] public bool All { get; set; } protected override SqlConnectionStringBuilder GetConnectionFromEnvironment(DeploymentEnvironment environment) { return environment.WarehouseDatabase; } public override void ExecuteCommand() { Log.Info("Generate reports begin"); CreateContainerIfNotExists(); CreateReport_NuGetClientVersion(); CreateReport_Last6Months(); CreateReport_RecentPopularityDetail(); CreateReport_RecentPopularity(); if (All) { CreateAllPerPackageReports(); } else { CreateDirtyPerPackageReports(); ClearInactivePackageReports(); } Log.Info("Generate reports end"); } private void CreateReport_NuGetClientVersion() { Log.Info("CreateReport_NuGetClientVersion"); Tuple<string[], List<object[]>> report = ExecuteSql("NuGetGallery.Operations.Scripts.DownloadReport_NuGetClientVersion.sql"); CreateBlob(NuGetClientVersion + ".json", JsonContentType, ReportHelpers.ToJson(report)); } private void CreateReport_Last6Months() { Log.Info("CreateReport_Last6Months"); Tuple<string[], List<object[]>> report = ExecuteSql("NuGetGallery.Operations.Scripts.DownloadReport_Last6Months.sql"); CreateBlob(Last6Months + ".json", JsonContentType, ReportHelpers.ToJson(report)); } private void CreateReport_RecentPopularityDetail() { Log.Info("CreateReport_RecentPopularityDetail"); Tuple<string[], List<object[]>> report = ExecuteSql("NuGetGallery.Operations.Scripts.DownloadReport_RecentPopularityDetail.sql"); CreateBlob(RecentPopularityDetail + ".json", JsonContentType, ReportHelpers.ToJson(report)); } private void CreateReport_RecentPopularity() { Log.Info("CreateReport_RecentPopularity"); Tuple<string[], List<object[]>> report = ExecuteSql("NuGetGallery.Operations.Scripts.DownloadReport_RecentPopularity.sql"); CreateBlob(RecentPopularity + ".json", JsonContentType, ReportHelpers.ToJson(report)); CreatePerPackageReports(report); } private void CreatePerPackageReports(Tuple<string[], List<object[]>> report) { Log.Info(string.Format("CreatePerPackageReports (count = {0})", report.Item2.Count)); int indexOfPackageId = 0; foreach (string column in report.Item1) { if (column == "PackageId") { break; } indexOfPackageId++; } if (indexOfPackageId == report.Item1.Length) { throw new InvalidOperationException("expected PackageId in result"); } foreach (object[] row in report.Item2) { string packageId = row[indexOfPackageId].ToString(); WithRetry(() => { CreatePackageReport(packageId); }); } } private void CreateAllPerPackageReports() { Log.Info("CreateAllPerPackageReports"); DateTime before = DateTime.Now; IList<string> packageIds = GetAllPackageIds(); string[] bag = new string[packageIds.Count]; int index = 0; foreach (string packageId in packageIds) { bag[index++] = packageId; } ParallelOptions options = new ParallelOptions() { MaxDegreeOfParallelism = 4 }; Parallel.ForEach(bag, options, packageId => { WithRetry(() => { CreatePackageReport(packageId); }); }); string msg = string.Format("CreateAllPerPackageReports complete {0} seconds", (DateTime.Now - before).TotalSeconds); Log.Info(msg); } private IList<string> GetAllPackageIds() { IList<string> packageIds = new List<string>(); using (SqlConnection connection = new SqlConnection(ConnectionString.ConnectionString)) { connection.Open(); SqlCommand command = new SqlCommand("SELECT DISTINCT packageId FROM Dimension_Package", connection); command.CommandType = CommandType.Text; command.CommandTimeout = 60 * 5; SqlDataReader reader = command.ExecuteReader(); while (reader.Read()) { string packageId = reader.GetValue(0).ToString(); packageIds.Add(packageId); } } return packageIds; } private void CreateDirtyPerPackageReports() { Log.Info("CreateDirtyPerPackageReports"); DateTime before = DateTime.Now; IList<Tuple<string, int>> packageIds = GetPackageIds(); Log.Info(string.Format("Creating {0} Reports", packageIds.Count)); Tuple<string, int>[] bag = new Tuple<string, int>[packageIds.Count]; int index =0; foreach (Tuple<string, int> packageId in packageIds) { bag[index++] = packageId; } // limit the potential concurrency becasue this is against SQL ParallelOptions options = new ParallelOptions() { MaxDegreeOfParallelism = 4 }; Parallel.ForEach(bag, options, packageId => { WithRetry(() => { CreatePackageReport(packageId.Item1); ConfirmExport(packageId); }); }); string msg = string.Format("CreateDirtyPerPackageReports complete {0} seconds", (DateTime.Now - before).TotalSeconds); Log.Info(msg); } private IList<Tuple<string, int>> GetPackageIds() { IList<Tuple<string, int>> packageIds = new List<Tuple<string, int>>(); using (SqlConnection connection = new SqlConnection(ConnectionString.ConnectionString)) { connection.Open(); SqlCommand command = new SqlCommand("GetPackagesForExport", connection); command.CommandType = CommandType.StoredProcedure; command.CommandTimeout = 60 * 5; SqlDataReader reader = command.ExecuteReader(); while (reader.Read()) { string packageId = reader.GetValue(0).ToString(); int dirtyCount = (int)reader.GetValue(1); packageIds.Add(new Tuple<string, int>(packageId, dirtyCount)); } } return packageIds; } // for the initial release we will run New and Old reports in parallel // (the difference is that new reports contain more details) // then when we are happy with our new deployment we will drop the old private void CreatePackageReport(string packageId) { Log.Info(string.Format("CreatePackageReport for {0}", packageId)); // All blob names use lower case identifiers in the NuGet Gallery Azure Blob Storage string name = PackageReportDetailBaseName + packageId.ToLowerInvariant(); JObject report = CreateJsonContent(packageId); CreateBlob(name + ".json", JsonContentType, ReportHelpers.ToStream(report)); } private JObject CreateJsonContent(string packageId) { Tuple<string[], List<object[]>> data = ExecuteSql("NuGetGallery.Operations.Scripts.DownloadReport_RecentPopularityDetailByPackage.sql", new Tuple<string, int, string>("@packageId", 128, packageId)); JObject content = MakeReportJson(data); TotalDownloads(content); SortItems(content); return content; } static JObject MakeReportJson(Tuple<string[], List<object[]>> data) { JObject report = new JObject(); report.Add("Downloads", 0); JObject items = new JObject(); foreach (object[] row in data.Item2) { string packageVersion = (string)row[0]; int downloads = (int)row[row.Length - 1]; JObject childReport; JToken token; if (items.TryGetValue(packageVersion, out token)) { childReport = (JObject)token; } else { childReport = new JObject(); childReport.Add("Downloads", 0); childReport.Add("Items", new JArray()); childReport.Add("Version", packageVersion); items.Add(packageVersion, childReport); } JObject obj = new JObject(); if (row[1].ToString() == "NuGet" || row[1].ToString() == "WebMatrix") { obj.Add("Client", string.Format("{0} {1}.{2}", row[2], row[3], row[4])); obj.Add("ClientName", row[2].ToString()); obj.Add("ClientVersion", string.Format("{0}.{1}", row[3], row[4])); } else { obj.Add("Client", row[2].ToString()); obj.Add("ClientName", row[2].ToString()); obj.Add("ClientVersion", ""); } if (row[5].ToString() != "(unknown)") { obj.Add("Operation", row[5].ToString()); } obj.Add("Downloads", (int)row[6]); ((JArray)childReport["Items"]).Add(obj); } report.Add("Items", items); return report; } private static int TotalDownloads(JObject report) { JToken token; if (report.TryGetValue("Items", out token)) { if (token is JArray) { int total = 0; for (int i = 0; i < ((JArray)token).Count; i++) { total += TotalDownloads((JObject)((JArray)token)[i]); } report["Downloads"] = total; return total; } else { int total = 0; foreach (KeyValuePair<string, JToken> child in ((JObject)token)) { total += TotalDownloads((JObject)child.Value); } report["Downloads"] = total; return total; } } return (int)report["Downloads"]; } private static void SortItems(JObject report) { List<Tuple<int, JObject>> scratch = new List<Tuple<int, JObject>>(); foreach (KeyValuePair<string, JToken> child in ((JObject)report["Items"])) { scratch.Add(new Tuple<int, JObject>((int)child.Value["Downloads"], new JObject((JObject)child.Value))); } scratch.Sort((x, y) => { return x.Item1 == y.Item1 ? 0 : x.Item1 < y.Item1 ? 1 : -1; }); JArray items = new JArray(); foreach (Tuple<int, JObject> item in scratch) { items.Add(item.Item2); } report["Items"] = items; } private void CreateEmptyPackageReport(string packageId) { Log.Info(string.Format("CreateEmptyPackageReport for {0}", packageId)); // All blob names use lower case identifiers in the NuGet Gallery Azure Blob Storage string name = PackageReportDetailBaseName + packageId.ToLowerInvariant(); CreateBlob(name + ".json", JsonContentType, ReportHelpers.ToStream(new JObject())); } private void ClearInactivePackageReports() { Log.Info("ClearInactivePackageReports"); IList<string> packageIds = GetInactivePackageIds(); Log.Info(string.Format("Creating {0} empty Reports", packageIds.Count)); string[] bag = new string[packageIds.Count]; int index = 0; foreach (string packageId in packageIds) { bag[index++] = packageId; } Parallel.ForEach(bag, packageId => { CreateEmptyPackageReport(packageId); }); } private IList<string> GetInactivePackageIds() { string sql = ResourceHelper.GetBatchFromSqlFile("NuGetGallery.Operations.Scripts.DownloadReport_ListInactive.sql"); IList<string> packageIds = new List<string>(); using (SqlConnection connection = new SqlConnection(ConnectionString.ConnectionString)) { connection.Open(); SqlCommand command = new SqlCommand(sql, connection); command.CommandType = CommandType.Text; command.CommandTimeout = 60 * 5; SqlDataReader reader = command.ExecuteReader(); while (reader.Read()) { string packageId = reader.GetValue(0).ToString(); packageIds.Add(packageId); } } return packageIds; } private void ConfirmExport(Tuple<string, int> packageId) { Log.Info(string.Format("ConfirmPackageExported for {0}", packageId.Item1)); using (SqlConnection connection = new SqlConnection(ConnectionString.ConnectionString)) { connection.Open(); SqlCommand command = new SqlCommand("ConfirmPackageExported", connection); command.CommandType = CommandType.StoredProcedure; command.CommandTimeout = 60 * 5; command.Parameters.AddWithValue("PackageId", packageId.Item1); command.Parameters.AddWithValue("DirtyCount", packageId.Item2); command.ExecuteNonQuery(); } } private Tuple<string[], List<object[]>> ExecuteSql(string filename, params Tuple<string, int, string>[] parameters) { string sql = ResourceHelper.GetBatchFromSqlFile(filename); List<object[]> rows = new List<object[]>(); string[] columns; using (SqlConnection connection = new SqlConnection(ConnectionString.ConnectionString)) { connection.Open(); SqlCommand command = new SqlCommand(sql, connection); command.CommandType = CommandType.Text; command.CommandTimeout = 60 * 5; foreach (Tuple<string, int, string> parameter in parameters) { command.Parameters.Add(parameter.Item1, SqlDbType.NVarChar, parameter.Item2).Value = parameter.Item3; } SqlDataReader reader = command.ExecuteReader(); columns = new string[reader.FieldCount]; for (int i = 0; i < reader.FieldCount; i++) { columns[i] = reader.GetName(i); } while (reader.Read()) { object[] row = new object[reader.FieldCount]; for (int i = 0; i < reader.FieldCount; i++) { row[i] = reader.IsDBNull(i) ? null : reader.GetValue(i); } rows.Add(row); } } return new Tuple<string[], List<object[]>>(columns, rows); } private void CreateContainerIfNotExists() { CloudBlobClient blobClient = StorageAccount.CreateCloudBlobClient(); CloudBlobContainer container = blobClient.GetContainerReference("stats"); container.CreateIfNotExists(); // this can throw if the container was just deleted a few seconds ago container.SetPermissions(new BlobContainerPermissions { PublicAccess = BlobContainerPublicAccessType.Blob }); } private Uri CreateBlob(string name, string contentType, Stream content) { CloudBlobClient blobClient = StorageAccount.CreateCloudBlobClient(); CloudBlobContainer container = blobClient.GetContainerReference("stats"); CloudBlockBlob blockBlob = container.GetBlockBlobReference("popularity/" + name); blockBlob.Properties.ContentType = contentType; blockBlob.UploadFromStream(content); return blockBlob.Uri; } private void WithRetry(Action action) { int attempts = 10; while (attempts-- > 0) { try { action(); break; } catch (Exception e) { if (attempts == 1) { throw e; } else { SqlConnection.ClearAllPools(); Log.Info(string.Format("Retry attempts remaining {0}", attempts)); Thread.Sleep(20 * 1000); } } } } } }