tools/pipeline-witness/Azure.Sdk.Tools.PipelineWitness/GitHubActions/GitHubActionProcessor.cs (373 lines of code) (raw):

using System.Collections.Generic; using System.IO.Compression; using System.IO; using System.Text.RegularExpressions; using System.Threading.Tasks; using System; using System.Linq; using System.Net; using Octokit; using Azure.Storage.Blobs; using Microsoft.Extensions.Logging; using Azure.Storage.Blobs.Models; using Newtonsoft.Json; using Newtonsoft.Json.Converters; using Newtonsoft.Json.Serialization; using System.Text; using System.Threading; using Azure.Sdk.Tools.PipelineWitness.Utilities; namespace Azure.Sdk.Tools.PipelineWitness.GitHubActions { public partial class GitHubActionProcessor { private const string RunsContainerName = "githubactionsruns"; private const string JobsContainerName = "githubactionsjobs"; private const string StepsContainerName = "githubactionssteps"; private const string LogsContainerName = "githubactionslogs"; private const string TimeFormat = @"yyyy-MM-dd\THH:mm:ss.fffffff\Z"; [GeneratedRegex(@"^(?:(?<folder>.*)\/)?(?<index>\d+)_(?<name>[^\/]+)\.txt$")] private static partial Regex LogFilePathRegex(); private static readonly JsonSerializerSettings jsonSettings = new() { ContractResolver = new CamelCasePropertyNamesContractResolver(), Converters = { new StringEnumConverter(new CamelCaseNamingStrategy()) }, Formatting = Formatting.None, }; private readonly ILogger<GitHubActionProcessor> logger; private readonly GitHubClientFactory clientFactory; private readonly BlobContainerClient runsContainerClient; private readonly BlobContainerClient jobsContainerClient; private readonly BlobContainerClient stepsContainerClient; private readonly BlobContainerClient logsContainerClient; public GitHubActionProcessor(ILogger<GitHubActionProcessor> logger, BlobServiceClient blobServiceClient, GitHubClientFactory clientFactory) { ArgumentNullException.ThrowIfNull(logger); ArgumentNullException.ThrowIfNull(blobServiceClient); ArgumentNullException.ThrowIfNull(clientFactory); this.logger = logger; this.logsContainerClient = blobServiceClient.GetBlobContainerClient(LogsContainerName); this.runsContainerClient = blobServiceClient.GetBlobContainerClient(RunsContainerName); this.jobsContainerClient = blobServiceClient.GetBlobContainerClient(JobsContainerName); this.stepsContainerClient = blobServiceClient.GetBlobContainerClient(StepsContainerName); this.clientFactory = clientFactory; } public async Task ProcessAsync(string owner, string repository, long runId) { IGitHubClient client = await this.clientFactory.CreateGitHubClientAsync(owner, repository); WorkflowRun run = await GetWorkflowRunAsync(client, owner, repository, runId); await ProcessWorkflowRunAsync(client, run); for (long attempt = 1; attempt < run.RunAttempt; attempt++) { WorkflowRun runAttempt = await client.Actions.Workflows.Runs.GetAttempt(owner, repository, runId, attempt); await ProcessWorkflowRunAsync(client, runAttempt); } } public async Task<string[]> GetRunBlobNamesAsync(string repository, DateTimeOffset minTime, DateTimeOffset maxTime, CancellationToken cancellationToken) { DateTimeOffset minDay = minTime.ToUniversalTime().Date; DateTimeOffset maxDay = maxTime.ToUniversalTime().Date; DateTimeOffset[] days = Enumerable.Range(0, (int)(maxDay - minDay).TotalDays + 1) .Select(offset => minDay.AddDays(offset)) .ToArray(); List<string> blobNames = []; foreach (DateTimeOffset day in days) { string blobPrefix = $"{repository}/{day:yyyy/MM/dd}/".ToLower(); AsyncPageable<BlobItem> blobs = this.runsContainerClient.GetBlobsAsync(prefix: blobPrefix, cancellationToken: cancellationToken); await foreach (BlobItem blob in blobs) { blobNames.Add(blob.Name); } } return blobNames.ToArray(); } public string GetRunBlobName(WorkflowRun run) { string repository = run.Repository.FullName; long runId = run.Id; long attempt = run.RunAttempt; DateTimeOffset runStartedAt = run.RunStartedAt; string blobName = $"{repository}/{runStartedAt:yyyy/MM/dd}/{runId}-{attempt}.jsonl".ToLower(); return blobName; } private async Task ProcessWorkflowRunAsync(IGitHubClient client, WorkflowRun run) { Workflow workflow = await GetWorkflowAsync(client, run); List<WorkflowJob> jobs = await GetJobsAsync(client, run); await UploadJobsBlobAsync(workflow, run, jobs); await UploadStepsBlobAsync(workflow, run, jobs); await UploadLogsBlobAsync(client, workflow, run, jobs); // We upload the run blob last. This allows us to use the existence of the blob as a signal that run processing is complete. await UploadRunBlobAsync(workflow, run); } private async Task UploadRunBlobAsync(Workflow workflow, WorkflowRun run) { string repository = run.Repository.FullName; long workflowId = workflow.Id; string workflowName = workflow.Name; long runId = run.Id; string runName = run.Name; long attempt = run.RunAttempt; try { // even though runid/attempt is unique, we still add a date component to the path for easier browsing // multiple attempts have the same runStartedAt, so the different attempt blobs will be in the same folder string blobPath = GetRunBlobName(run); BlobClient blobClient = this.runsContainerClient.GetBlobClient(blobPath); if (await blobClient.ExistsAsync()) { this.logger.LogInformation("Skipping existing workflow jobs for repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, workflowName, runId, attempt); return; } this.logger.LogInformation("Processing workflow jobs for repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, workflowName, runId, attempt); string content = JsonConvert.SerializeObject( new { Repository = repository, WorkflowId = workflowId, WorkflowName = workflowName, RunId = runId, RunName = runName, run.RunNumber, run.HeadBranch, run.HeadSha, RunAttenpt = attempt, run.Event, Status = run.Status.StringValue, Conclusion = run.Conclusion?.StringValue, run.CheckSuiteId, run.DisplayTitle, run.Path, RunStartedAt = run.RunStartedAt.ToString(TimeFormat), CreatedAt = run.CreatedAt.ToString(TimeFormat), UpdatedAt = run.UpdatedAt.ToString(TimeFormat), run.NodeId, run.CheckSuiteNodeId, HeadRepository = run.HeadRepository?.FullName, run.Url, run.HtmlUrl, EtlIngestDate = DateTime.UtcNow.ToString(TimeFormat), }, jsonSettings); await blobClient.UploadAsync(new BinaryData(content)); } catch (RequestFailedException ex) when (ex.Status == (int)HttpStatusCode.Conflict) { this.logger.LogInformation("Ignoring existing blob exception for repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, workflowName, runId, attempt); } catch (Exception ex) { this.logger.LogError(ex, "Error processing repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, workflowName, runId, attempt); throw; } } private async Task UploadJobsBlobAsync(Workflow workflow, WorkflowRun run, List<WorkflowJob> jobs) { string repository = run.Repository.FullName; long workflowId = workflow.Id; string workflowName = workflow.Name; long runId = run.Id; string runName = run.Name; long attempt = run.RunAttempt; try { string blobPath = $"{repository}/{run.RunStartedAt:yyyy/MM/dd}/{runId}-{attempt}.jsonl".ToLower(); BlobClient blobClient = this.jobsContainerClient.GetBlobClient(blobPath); if (await blobClient.ExistsAsync()) { this.logger.LogInformation("Skipping existing workflow jobs for repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, workflowName, runId, attempt); return; } this.logger.LogInformation("Processing workflow jobs for repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, workflowName, runId, attempt); StringBuilder builder = new(); foreach (var job in jobs) { builder.AppendLine(JsonConvert.SerializeObject( new { Repository = repository, WorkflowId = workflowId, WorkflowName = workflowName, RunId = runId, RunName = runName, RunAttempt = attempt, JobId = job.Id, JobName = job.Name, Status = job.Status.StringValue, Conclusion = job.Conclusion?.StringValue, CreatedAt = job.CreatedAt?.ToString(TimeFormat), StartedAt = job.StartedAt.ToString(TimeFormat), CompletedAt = job.CompletedAt?.ToString(TimeFormat), job.NodeId, job.HeadSha, job.Labels, job.RunnerId, job.RunnerName, job.RunnerGroupId, job.RunnerGroupName, job.HtmlUrl, EtlIngestDate = DateTime.UtcNow.ToString(TimeFormat), }, jsonSettings)); } await blobClient.UploadAsync(new BinaryData(builder.ToString())); } catch (RequestFailedException ex) when (ex.Status == (int)HttpStatusCode.Conflict) { this.logger.LogInformation("Ignoring existing blob exception for repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, workflowName, runId, attempt); } catch (Exception ex) { this.logger.LogError(ex, "Error processing repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, workflowName, runId, attempt); throw; } } private async Task UploadStepsBlobAsync(Workflow workflow, WorkflowRun run, List<WorkflowJob> jobs) { string repository = run.Repository.FullName; long workflowId = workflow.Id; string workflowName = workflow.Name; long runId = run.Id; string runName = run.Name; long attempt = run.RunAttempt; try { // logs with a given runId/attempt are immutable and retries add new attempts. // even though runid/attempt is unique, we still add a date component to the path for easier browsing // multiple attempts have the same runStartedAt, so the different attempt blobs will be in the same folder string blobPath = $"{repository}/{run.RunStartedAt:yyyy/MM/dd}/{runId}-{attempt}.jsonl".ToLower(); BlobClient blobClient = this.stepsContainerClient.GetBlobClient(blobPath); if (await blobClient.ExistsAsync()) { this.logger.LogInformation("Skipping existing workflow steps for repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, workflowName, runId, attempt); return; } this.logger.LogInformation("Processing workflow steps for repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, workflowName, runId, attempt); StringBuilder builder = new(); foreach (var job in jobs) { foreach (var step in job.Steps) { builder.AppendLine(JsonConvert.SerializeObject( new { Repository = repository, WorkflowId = workflowId, WorkflowName = workflowName, RunId = runId, RunName = runName, RunAttempt = attempt, JobId = job.Id, JobName = job.Name, StepNumber = step.Number, StepName = step.Name, Status = step.Status.StringValue, Conclusion = step.Conclusion?.StringValue, StartedAt = step.StartedAt?.ToString(TimeFormat), CompletedAt = step.CompletedAt?.ToString(TimeFormat), EtlIngestDate = DateTime.UtcNow.ToString(TimeFormat), }, jsonSettings)); } } await blobClient.UploadAsync(new BinaryData(builder.ToString())); } catch (RequestFailedException ex) when (ex.Status == (int)HttpStatusCode.Conflict) { this.logger.LogInformation("Ignoring existing blob exception for repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, runName, runId, attempt); } catch (Exception ex) { this.logger.LogError(ex, "Error processing repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, runName, runId, attempt); throw; } } private async Task UploadLogsBlobAsync(IGitHubClient client, Workflow workflow, WorkflowRun run, List<WorkflowJob> jobs) { string repository = run.Repository.FullName; long workflowId = workflow.Id; string workflowName = workflow.Name; long runId = run.Id; string runName = run.Name; long attempt = run.RunAttempt; try { // logs with a given runId/attempt are immutable and retries add new attempts. // even though runid/attempt is unique, we still add a date component to the path for easier browsing // multiple attempts have the same runStartedAt, so the different attempt blobs will be in the same folder string blobPath = $"{repository}/{run.RunStartedAt:yyyy/MM/dd}/{runId}-{attempt}.jsonl".ToLower(); BlobClient blobClient = this.logsContainerClient.GetBlobClient(blobPath); if (await blobClient.ExistsAsync()) { this.logger.LogInformation("Skipping existing log for repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, runName, runId, attempt); return; } this.logger.LogInformation("Processing log for repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, runName, runId, attempt); using ZipArchive archive = await GetLogsAsync(client, run); var logEntries = archive.Entries .Select(x => new { Entry = x, NameRegex = LogFilePathRegex().Match(x.FullName), }) .Where(x => x.NameRegex.Success) .Select(x => new { ParentName = x.NameRegex.Groups["folder"].Value, Index = x.NameRegex.Groups["index"].Value, RecordName = x.NameRegex.Groups["name"].Value, x.Entry }) .ToDictionary(x => string.IsNullOrEmpty(x.ParentName) ? x.RecordName : $"{x.ParentName}/{x.Index}", x => x.Entry); await using Stream blobStream = await blobClient.OpenWriteAsync(overwrite: true, new BlobOpenWriteOptions()); await using StreamWriter blobWriter = new(blobStream); long characterCount = 0; int lineCount = 0; foreach (var job in jobs) { // Retries may not run all jobs and skipped jobs will not have logs // The jobs still appear in the API response, but their runnerName is empty bool isRetrySkipped = string.IsNullOrEmpty(job.RunnerName) && attempt > 1; if (!logEntries.TryGetValue(job.Name, out ZipArchiveEntry jobEntry)) { if (!isRetrySkipped) { // All jobs in the first attempt or with runner names should have logs this.logger.LogWarning("Missing log entry for job {JobName}", job.Name); } continue; } IList<LogLine> logLines = ReadLogLines(jobEntry, step: 0, job.StartedAt); IList<LogLine> stepLines = job.Steps .Where(x => x.Conclusion != WorkflowJobConclusion.Skipped) .OrderBy(x => x.Number) .SelectMany(step => ReadLogLines(logEntries[$"{job.Name}/{step.Number}"], step.Number, step.StartedAt ?? job.StartedAt)) .ToArray(); UpdateStepLines(logLines, stepLines); foreach (LogLine logLine in logLines) { characterCount += logLine.Message.Length; lineCount += 1; await blobWriter.WriteLineAsync(JsonConvert.SerializeObject(new { Repository = repository, WorkflowId = workflowId, WorkflowName = workflowName, RunId = runId, RunAttempt = attempt, JobId = job.Id, StepNumber = logLine.Step, LineNumber = logLine.Number, logLine.Message.Length, Timestamp = logLine.Timestamp.ToString(TimeFormat), logLine.Message, EtlIngestDate = DateTime.UtcNow.ToString(TimeFormat), }, jsonSettings)); } } this.logger.LogInformation("Processed {CharacterCount} characters and {LineCount} lines for repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", characterCount, lineCount, repository, runName, runId, attempt); } catch (RequestFailedException ex) when (ex.Status == (int)HttpStatusCode.Conflict) { this.logger.LogInformation("Ignoring existing blob exception for repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, runName, runId, attempt); } catch (Exception ex) { this.logger.LogError(ex, "Error processing repository {Repository}, workflow {Workflow}, run {RunId}, attempt {Attempt}", repository, runName, runId, attempt); throw; } } private static bool UpdateStepLines(IList<LogLine> jobLines, IList<LogLine> stepLines) { // For each line in the step, remove the corresponding line from the job if (stepLines.Count == 0) { return true; } // seek to the first line in the job that is after the first line in the step for (int jobIndex = 0; jobIndex < jobLines.Count - stepLines.Count + 1; jobIndex++) { var isMatch = true; for (var stepIndex = 0; isMatch && stepIndex < stepLines.Count; stepIndex++) { var stepLine = stepLines[stepIndex]; var jobLine = jobLines[jobIndex + stepIndex]; if (jobLine.Message != stepLine.Message) { isMatch = false; } } if (isMatch) { // Replace the step number and timestamp with the values from the step log for (var stepIndex = 0; stepIndex < stepLines.Count; stepIndex++) { var stepLine = stepLines[stepIndex]; var jobLine = jobLines[jobIndex + stepIndex]; jobLine.Step = stepLine.Step; jobLine.Number = stepLine.Number; jobLine.Timestamp = stepLine.Timestamp; } return true; } } return false; } private static List<LogLine> ReadLogLines(ZipArchiveEntry entry, int step, DateTimeOffset logStartTime) { var result = new List<LogLine>(); using var logReader = new StreamReader(entry.Open()); DateTimeOffset lastTimestamp = logStartTime; for (int lineNumber = 1; !logReader.EndOfStream; lineNumber++) { string line = logReader.ReadLine(); var (timestamp, message) = StringUtilities.ParseLogLine(line, lastTimestamp); lastTimestamp = timestamp; result.Add(new LogLine { Step = step, Number = lineNumber, Timestamp = timestamp, Message = message }); } return result; } private static async Task<WorkflowRun> GetWorkflowRunAsync(IGitHubClient client, string owner, string repository, long runId) { WorkflowRun workflowRun = await client.Actions.Workflows.Runs.Get(owner, repository, runId); return workflowRun; } private static async Task<Workflow> GetWorkflowAsync(IGitHubClient client, WorkflowRun run) { Workflow workflow = await client.Actions.Workflows.Get(run.Repository.Owner.Login, run.Repository.Name, run.WorkflowId); return workflow; } private static async Task<List<WorkflowJob>> GetJobsAsync(IGitHubClient client, WorkflowRun run) { List<WorkflowJob> jobs = []; for (int pageNumber = 1; ; pageNumber++) { ApiOptions options = new() { PageSize = 100, PageCount = 1, StartPage = pageNumber }; WorkflowJobsResponse jobsResponse = await client.Actions.Workflows.Jobs.List(run.Repository.Owner.Login, run.Repository.Name, run.Id, (int)run.RunAttempt, options); IReadOnlyList<WorkflowJob> pageJobs = jobsResponse.Jobs; if (pageJobs.Count == 0) { break; } jobs.AddRange(pageJobs); } return jobs; } private static async Task<ZipArchive> GetLogsAsync(IGitHubClient client, WorkflowRun run) { var logBytes = await client.Actions.Workflows.Runs.GetAttemptLogs(run.Repository.Owner.Login, run.Repository.Name, run.Id, run.RunAttempt); return new ZipArchive(new MemoryStream(logBytes), ZipArchiveMode.Read, false); } private class LogLine { public int Step; public int Number; public DateTimeOffset Timestamp; public string Message; }; } }