tools/pipeline-witness/Azure.Sdk.Tools.PipelineWitness/AzurePipelines/AzurePipelinesProcessor.cs (828 lines of code) (raw):
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Net;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Azure.Sdk.Tools.PipelineWitness.Configuration;
using Azure.Sdk.Tools.PipelineWitness.Utilities;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Microsoft.TeamFoundation.Build.WebApi;
using Microsoft.TeamFoundation.TestManagement.WebApi;
using Microsoft.VisualStudio.Services.TestResults.WebApi;
using Microsoft.VisualStudio.Services.WebApi;
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
using Newtonsoft.Json.Serialization;
using Attachment = Microsoft.TeamFoundation.Build.WebApi.Attachment;
using Timeline = Microsoft.TeamFoundation.Build.WebApi.Timeline;
namespace Azure.Sdk.Tools.PipelineWitness.AzurePipelines
{
[SuppressMessage("Style", "IDE0037:Use inferred member name", Justification = "Explicit member names are added to json export objects for clarity")]
public class AzurePipelinesProcessor
{
private const string BuildsContainerName = "builds";
private const string BuildLogLinesContainerName = "buildloglines";
private const string BuildTimelineRecordsContainerName = "buildtimelinerecords";
private const string BuildDefinitionsContainerName = "builddefinitions";
private const string PipelineOwnersContainerName = "pipelineowners";
private const string TestRunsContainerName = "testruns";
private const string TestResultsContainerName = "testrunresults";
private const int ApiBatchSize = 10000;
private const string TimeFormat = @"yyyy-MM-dd\THH:mm:ss.fffffff\Z";
private static readonly JsonSerializerSettings jsonSettings = new()
{
ContractResolver = new CamelCasePropertyNamesContractResolver(),
Converters = { new StringEnumConverter(new CamelCaseNamingStrategy()) },
Formatting = Formatting.None,
};
private readonly ILogger<AzurePipelinesProcessor> logger;
private readonly TestResultsHttpClient testResultsClient;
private readonly BuildHttpClient buildClient;
private readonly BlobContainerClient buildLogLinesContainerClient;
private readonly BlobContainerClient buildsContainerClient;
private readonly BlobContainerClient buildTimelineRecordsContainerClient;
private readonly BlobContainerClient testRunsContainerClient;
private readonly BlobContainerClient testResultsContainerClient;
private readonly BlobContainerClient buildDefinitionsContainerClient;
private readonly BlobContainerClient pipelineOwnersContainerClient;
private readonly IOptions<PipelineWitnessSettings> options;
private readonly Dictionary<string, int?> cachedDefinitionRevisions = [];
public AzurePipelinesProcessor(
ILogger<AzurePipelinesProcessor> logger,
BlobServiceClient blobServiceClient,
VssConnection vssConnection,
IOptions<PipelineWitnessSettings> options)
{
ArgumentNullException.ThrowIfNull(blobServiceClient);
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
this.options = options ?? throw new ArgumentNullException(nameof(options));
this.buildsContainerClient = blobServiceClient.GetBlobContainerClient(BuildsContainerName);
this.buildTimelineRecordsContainerClient = blobServiceClient.GetBlobContainerClient(BuildTimelineRecordsContainerName);
this.buildLogLinesContainerClient = blobServiceClient.GetBlobContainerClient(BuildLogLinesContainerName);
this.buildDefinitionsContainerClient = blobServiceClient.GetBlobContainerClient(BuildDefinitionsContainerName);
this.testRunsContainerClient = blobServiceClient.GetBlobContainerClient(TestRunsContainerName);
this.testResultsContainerClient = blobServiceClient.GetBlobContainerClient(TestResultsContainerName);
this.buildDefinitionsContainerClient = blobServiceClient.GetBlobContainerClient(BuildDefinitionsContainerName);
this.pipelineOwnersContainerClient = blobServiceClient.GetBlobContainerClient(PipelineOwnersContainerName);
ArgumentNullException.ThrowIfNull(vssConnection);
this.buildClient = vssConnection.GetClient<EnhancedBuildHttpClient>();
this.testResultsClient = vssConnection.GetClient<TestResultsHttpClient>();
}
public async Task UploadBuildBlobsAsync(string account, Guid projectId, int buildId)
{
Build build = await GetBuildAsync(projectId, buildId);
if (build == null)
{
this.logger.LogWarning("Unable to process run due to missing build. Project: {Project}, BuildId: {BuildId}", projectId, buildId);
return;
}
bool skipBuild = false;
// Project name is used in blob paths and cannot be empty
if (build.Project == null)
{
this.logger.LogWarning("Skipping build with null project property. Project: {Project}, BuildId: {BuildId}", projectId, buildId);
skipBuild = true;
}
else if (string.IsNullOrWhiteSpace(build.Project.Name))
{
this.logger.LogWarning("Skipping build with null project property. Project: {Project}, BuildId: {BuildId}", projectId, buildId);
skipBuild = true;
}
if (build.Deleted)
{
this.logger.LogInformation("Skipping deleted build. Project: {Project}, BuildId: {BuildId}", build.Project?.Name, buildId);
skipBuild = true;
}
if (build.StartTime == null)
{
this.logger.LogWarning("Skipping build with null start time. Project: {Project}, BuildId: {BuildId}", build.Project?.Name, buildId);
skipBuild = true;
}
// FinishTime is used in blob paths and cannot be null
if (build.FinishTime == null)
{
this.logger.LogWarning("Skipping build with null finish time. Project: {Project}, BuildId: {BuildId}", build.Project?.Name, buildId);
skipBuild = true;
}
// QueueTime is used in blob paths and cannot be null
if (build.QueueTime == null)
{
this.logger.LogWarning("Skipping build with null queue time. Project: {Project}, BuildId: {BuildId}", build.Project?.Name, buildId);
skipBuild = true;
}
if (build.Definition == null)
{
this.logger.LogWarning("Skipping build with null definition property. Project: {Project}, BuildId: {BuildId}", build.Project?.Name, buildId);
skipBuild = true;
}
if (skipBuild)
{
return;
}
await UploadTestRunBlobsAsync(account, build);
Timeline timeline = await this.buildClient.GetBuildTimelineAsync(projectId, buildId);
if (timeline == null)
{
this.logger.LogWarning("No timeline available for build {Project}: {BuildId}", build.Project.Name, build.Id);
}
else
{
await UploadTimelineBlobAsync(account, build, timeline);
}
List<BuildLog> logs = await this.buildClient.GetBuildLogsAsync(build.Project.Id, build.Id);
if (logs == null || logs.Count == 0)
{
this.logger.LogWarning("No logs available for build {Project}: {BuildId}", build.Project.Name, build.Id);
return;
}
List<BuildLogInfo> buildLogInfos = GetBuildLogInfos(build, timeline, logs);
foreach (BuildLogInfo log in buildLogInfos)
{
await UploadLogLinesBlobAsync(account, build, log);
}
if (build.Definition.Id == this.options.Value.PipelineOwnersDefinitionId)
{
await UploadPipelineOwnersBlobAsync(account, build, timeline);
}
// We upload the build blob last. This allows us to use the existence of the blob as a signal that build processing is complete.
await UploadBuildBlobAsync(account, build);
}
public async Task<string[]> GetBuildBlobNamesAsync(string projectName, DateTimeOffset minTime, DateTimeOffset maxTime, CancellationToken cancellationToken)
{
DateTimeOffset minDay = minTime.ToUniversalTime().Date;
DateTimeOffset maxDay = maxTime.ToUniversalTime().Date;
DateTimeOffset[] days = Enumerable.Range(0, (int)(maxDay - minDay).TotalDays + 1)
.Select(offset => minDay.AddDays(offset))
.ToArray();
List<string> blobNames = [];
foreach (DateTimeOffset day in days)
{
string blobPrefix = $"{projectName}/{day:yyyy/MM/dd}/";
await foreach (BlobItem blob in this.buildsContainerClient.GetBlobsAsync(prefix: blobPrefix, cancellationToken: cancellationToken))
{
blobNames.Add(blob.Name);
}
}
return [.. blobNames];
}
public string GetBuildBlobName(Build build)
{
long changeTime = ((DateTimeOffset)build.LastChangedDate).ToUnixTimeSeconds();
string blobName = $"{build.Project.Name}/{build.FinishTime:yyyy/MM/dd}/{build.Id}-{changeTime}.jsonl".ToLower();
return blobName;
}
private async Task UploadPipelineOwnersBlobAsync(string account, Build build, Timeline timeline)
{
try
{
string blobPath = $"{build.Project.Name}/{build.FinishTime:yyyy/MM/dd}/{build.Id}-{timeline.ChangeId}.jsonl".ToLower();
BlobClient blobClient = this.pipelineOwnersContainerClient.GetBlobClient(blobPath);
if (await blobClient.ExistsAsync())
{
this.logger.LogInformation("Skipping existing build failure blob for build {BuildId}", build.Id);
return;
}
Dictionary<int, string[]> owners = await GetOwnersFromBuildArtifactAsync(build);
if (owners == null)
{
// no need to log anything here. GetOwnersFromBuildArtifactAsync logs a warning before returning null;
return;
}
this.logger.LogInformation("Creating owners blob for build {DefinitionId} change {ChangeId}", build.Id, timeline.ChangeId);
StringBuilder stringBuilder = new();
foreach (KeyValuePair<int, string[]> owner in owners)
{
string contentLine = JsonConvert.SerializeObject(new
{
OrganizationName = account,
BuildDefinitionId = owner.Key,
Owners = owner.Value,
Timestamp = new DateTimeOffset(build.FinishTime!.Value).ToUniversalTime(),
EtlIngestDate = DateTimeOffset.UtcNow
}, jsonSettings);
stringBuilder.AppendLine(contentLine);
}
await blobClient.UploadAsync(new BinaryData(stringBuilder.ToString()));
}
catch (RequestFailedException ex) when (ex.Status == (int)HttpStatusCode.Conflict)
{
this.logger.LogInformation("Ignoring exception from existing owners blob for build {BuildId}", build.Id);
}
catch (Exception ex)
{
this.logger.LogError(ex, "Error processing owners artifact from build {BuildId}", build.Id);
throw;
}
}
private async Task<Dictionary<int, string[]>> GetOwnersFromBuildArtifactAsync(Build build)
{
string artifactName = this.options.Value.PipelineOwnersArtifactName;
string filePath = this.options.Value.PipelineOwnersFilePath;
try
{
await using Stream artifactStream = await this.buildClient.GetArtifactContentZipAsync(build.Project.Id, build.Id, artifactName);
using ZipArchive zip = new(artifactStream);
ZipArchiveEntry fileEntry = zip.GetEntry(filePath);
if (fileEntry == null)
{
this.logger.LogWarning("Artifact {ArtifactName} in build {BuildId} didn't contain the expected file {FilePath}", artifactName, build.Id, filePath);
return null;
}
await using Stream contentStream = fileEntry.Open();
using StreamReader contentReader = new(contentStream);
string content = await contentReader.ReadToEndAsync();
if (string.IsNullOrEmpty(content))
{
this.logger.LogWarning("The file {filePath} in artifact {ArtifactName} in build {BuildId} contained no content", filePath, artifactName, build.Id);
return null;
}
Dictionary<int, string[]> ownersDictionary = JsonConvert.DeserializeObject<Dictionary<int, string[]>>(content);
if (ownersDictionary == null)
{
this.logger.LogWarning("The file {filePath} in artifact {ArtifactName} in build {BuildId} contained a null json object", filePath, artifactName, build.Id);
}
return ownersDictionary;
}
catch (ArtifactNotFoundException ex)
{
this.logger.LogWarning(ex, "Build {BuildId} did not contain the expected artifact {ArtifactName}", build.Id, artifactName);
}
catch (InvalidDataException ex)
{
this.logger.LogWarning(ex, "Unable to read ZIP contents from artifact {ArtifactName} in build {BuildId}", artifactName, build.Id);
// rethrow the exception so the queue message will be retried.
throw;
}
catch (JsonSerializationException ex)
{
this.logger.LogWarning(ex, "Problem deserializing JSON from artifact {ArtifactName} in build {BuildId}", artifactName, build.Id);
}
return null;
}
public async Task UploadBuildDefinitionBlobsAsync(string account, string projectName)
{
IPagedList<BuildDefinition> definitions = await this.buildClient.GetFullDefinitionsAsync2(project: projectName);
foreach (BuildDefinition definition in definitions)
{
string cacheKey = $"{definition.Project.Id}:{definition.Id}";
if (!this.cachedDefinitionRevisions.TryGetValue(cacheKey, out int? cachedRevision) || cachedRevision != definition.Revision)
{
await UploadBuildDefinitionBlobAsync(account, definition);
}
this.cachedDefinitionRevisions[cacheKey] = definition.Revision;
}
}
private async Task UploadBuildDefinitionBlobAsync(string account, BuildDefinition definition)
{
string blobPath = $"{definition.Project.Name}/{definition.Id}-{definition.Revision}.jsonl".ToLower();
try
{
BlobClient blobClient = this.buildDefinitionsContainerClient.GetBlobClient(blobPath);
if (await blobClient.ExistsAsync())
{
this.logger.LogInformation("Skipping existing build definition blob for build {DefinitionId} project {Project}", definition.Id, definition.Project.Name);
return;
}
this.logger.LogInformation("Creating blob for build definition {DefinitionId} revision {Revision} project {Project}", definition.Id, definition.Revision, definition.Project.Name);
string content = JsonConvert.SerializeObject(new
{
OrganizationName = account,
ProjectId = definition.Project.Id,
BuildDefinitionId = definition.Id,
BuildDefinitionRevision = definition.Revision,
BuildDefinitionName = definition.Name,
Path = definition.Path,
RepositoryId = definition.Repository.Id,
RepositoryName = definition.Repository.Name,
CreatedDate = definition.CreatedDate,
DefaultBranch = definition.Repository.DefaultBranch,
ProjectName = definition.Project.Name,
ProjectRevision = definition.Project.Revision,
ProjectState = definition.Project.State,
Quality = definition.DefinitionQuality,
QueueId = definition.Queue?.Id,
QueueName = definition.Queue?.Name,
QueuePoolId = definition.Queue?.Pool?.Id,
QueuePoolIsHosted = definition.Queue?.Pool?.IsHosted,
QueuePoolName = definition.Queue?.Pool?.Name,
QueueStatus = definition.QueueStatus,
Type = definition.Type,
Url = $"https://dev.azure.com/{account}/{definition.Project.Name}/_build?definitionId={definition.Id}",
BadgeEnabled = definition.BadgeEnabled,
BuildNumberFormat = definition.BuildNumberFormat,
Comment = definition.Comment,
JobAuthorizationScope = definition.JobAuthorizationScope,
JobCancelTimeoutInMinutes = definition.JobCancelTimeoutInMinutes,
JobTimeoutInMinutes = definition.JobTimeoutInMinutes,
ProcessType = definition.Process.Type,
ProcessYamlFilename = definition.Process is YamlProcess yamlprocess ? yamlprocess.YamlFilename : null,
RepositoryCheckoutSubmodules = definition.Repository.CheckoutSubmodules,
RepositoryClean = definition.Repository.Clean,
RepositoryDefaultBranch = definition.Repository.DefaultBranch,
RepositoryRootFolder = definition.Repository.RootFolder,
RepositoryType = definition.Repository.Type,
RepositoryUrl = definition.Repository.Url,
Options = definition.Options,
Variables = definition.Variables,
Tags = definition.Tags,
Triggers = definition.Triggers,
EtlIngestDate = DateTimeOffset.UtcNow
}, jsonSettings);
await blobClient.UploadAsync(new BinaryData(content));
}
catch (RequestFailedException ex) when (ex.Status == (int)HttpStatusCode.Conflict)
{
this.logger.LogInformation("Ignoring exception from existing blob for build definition {DefinitionId}", definition.Id);
}
catch (Exception ex)
{
this.logger.LogError(ex, "Error processing blob for build definition {DefinitionId}", definition.Id);
throw;
}
}
private List<BuildLogInfo> GetBuildLogInfos(Build build, Timeline timeline, List<BuildLog> logs)
{
Dictionary<int, BuildLog> logsById = logs.ToDictionary(l => l.Id);
List<BuildLogInfo> buildLogInfos = [];
foreach (BuildLog log in logs)
{
TimelineRecord[] logRecords = timeline.Records.Where(x => x.Log?.Id == log.Id).ToArray();
if (logRecords.Length > 1)
{
this.logger.LogWarning("Found multiple timeline records for build {BuildId}, log {LogId}", build.Id, log.Id);
}
TimelineRecord logRecord = logRecords.FirstOrDefault();
// Job logs are typically just a duplication of their child task logs with the addition of extra start and end lines.
// If we can, we skip the redundant lines.
if (string.Equals(logRecord?.RecordType, "job", StringComparison.OrdinalIgnoreCase))
{
// find all of the child task records
IEnumerable<TimelineRecord> childRecords = timeline.Records.Where(x => x.ParentId == logRecord.Id);
// sum the line counts for all of the child task records
long childLineCount = childRecords
.Where(x => x.Log != null && logsById.ContainsKey(x.Log.Id))
.Sum(x => logsById[x.Log.Id].LineCount);
// if the job's line count is the task line count + 2, then we can skip the job log
if (log.LineCount == childLineCount + 2)
{
this.logger.LogTrace("Skipping redundant logs for build {BuildId}, job {RecordId}, log {LogId}", build.Id, logRecord.Id, log.Id);
continue;
}
}
buildLogInfos.Add(new BuildLogInfo
{
LogId = log.Id,
LineCount = log.LineCount,
LogCreatedOn = log.CreatedOn,
RecordId = logRecord?.Id,
ParentRecordId = logRecord?.ParentId,
RecordType = logRecord?.RecordType
});
}
return buildLogInfos;
}
private async Task UploadBuildBlobAsync(string account, Build build)
{
try
{
BlobClient blobClient = this.buildsContainerClient.GetBlobClient(GetBuildBlobName(build));
if (await blobClient.ExistsAsync())
{
this.logger.LogInformation("Skipping existing build blob for build {BuildId}", build.Id);
return;
}
string content = JsonConvert.SerializeObject(new
{
OrganizationName = account,
ProjectId = build.Project?.Id,
BuildId = build.Id,
DefinitionId = build.Definition?.Id,
RepositoryId = build.Repository?.Id,
BuildNumber = build.BuildNumber,
BuildNumberRevision = build.BuildNumberRevision,
DefinitionName = build.Definition?.Name,
DefinitionPath = build.Definition?.Path,
DefinitionProjectId = build.Definition?.Project?.Id,
DefinitionProjectName = build.Definition?.Project?.Name,
DefinitionProjectRevision = build.Definition?.Project?.Revision,
DefinitionProjectState = build.Definition?.Project?.State,
DefinitionRevision = build.Definition?.Revision,
DefinitionType = build.Definition?.Type,
QueueTime = build.QueueTime,
StartTime = build.StartTime,
FinishTime = build.FinishTime,
LastChangedDate = build.LastChangedDate,
LogsId = build.Logs?.Id,
LogsType = build.Logs?.Type,
OrchestrationPlanId = build.OrchestrationPlan?.PlanId,
Parameters = build.Parameters,
PlanId = build.Plans?.FirstOrDefault()?.PlanId,
Priority = build.Priority,
ProjectName = build.Project?.Name,
ProjectRevision = build.Project?.Revision,
ProjectState = build.Project?.State,
QueueId = build.Queue?.Id,
QueueName = build.Queue?.Name,
QueuePoolId = build.Queue?.Pool?.Id,
QueuePoolName = build.Queue?.Pool?.Name,
Reason = build.Reason,
RepositoryCheckoutSubmodules = build.Repository?.CheckoutSubmodules,
RepositoryType = build.Repository?.Type,
Result = build.Result,
RetainedByRelease = build.RetainedByRelease,
SourceBranch = build.SourceBranch,
SourceVersion = build.SourceVersion,
Status = build.Status,
Tags = build.Tags?.Count > 0 ? JsonConvert.SerializeObject(build.Tags, jsonSettings) : null,
Url = $"https://dev.azure.com/{account}/{build.Project!.Name}/_build/results?buildId={build.Id}",
ValidationResults = build.ValidationResults,
EtlIngestDate = DateTime.UtcNow,
}, jsonSettings);
await blobClient.UploadAsync(new BinaryData(content));
}
catch (RequestFailedException ex) when (ex.Status == (int)HttpStatusCode.Conflict)
{
this.logger.LogInformation("Ignoring exception from existing blob for build {BuildId}", build.Id);
}
catch (Exception ex)
{
this.logger.LogError(ex, "Error processing build blob for build {BuildId}", build.Id);
throw;
}
}
private async Task UploadTimelineBlobAsync(string account, Build build, Timeline timeline)
{
try
{
if (timeline.Records == null)
{
this.logger.LogInformation("Skipping timeline with null Records property for build {BuildId}", build.Id);
return;
}
string blobPath = $"{build.Project.Name}/{build.FinishTime:yyyy/MM/dd}/{build.Id}-{timeline.ChangeId}.jsonl".ToLower();
BlobClient blobClient = this.buildTimelineRecordsContainerClient.GetBlobClient(blobPath);
if (await blobClient.ExistsAsync())
{
this.logger.LogInformation("Skipping existing timeline for build {BuildId}, change {ChangeId}", build.Id, timeline.ChangeId);
return;
}
StringBuilder builder = new();
foreach (TimelineRecord record in timeline.Records)
{
builder.AppendLine(JsonConvert.SerializeObject(
new
{
OrganizationName = account,
ProjectId = build.Project?.Id,
ProjectName = build.Project?.Name,
BuildDefinitionId = build.Definition?.Id,
BuildDefinitionPath = build.Definition?.Path,
BuildDefinitionName = build.Definition?.Name,
BuildId = build.Id,
BuildTimelineId = timeline.Id,
RepositoryId = build.Repository?.Id,
RecordId = record.Id,
ParentId = record.ParentId,
ChangeId = timeline.ChangeId,
LastChangedBy = timeline.LastChangedBy,
LastChangedOn = timeline.LastChangedOn,
RecordChangeId = record.ChangeId,
DetailsChangeId = record.Details?.ChangeId,
DetailsId = record.Details?.Id,
WorkerName = record.WorkerName,
RecordName = record.Name,
Order = record.Order,
StartTime = record.StartTime,
FinishTime = record.FinishTime,
WarningCount = record.WarningCount,
ErrorCount = record.ErrorCount,
LogId = record.Log?.Id,
LogType = record.Log?.Type,
PercentComplete = record.PercentComplete,
Result = record.Result,
State = record.State,
TaskId = record.Task?.Id,
TaskName = record.Task?.Name,
TaskVersion = record.Task?.Version,
Type = record.RecordType,
Issues = record.Issues?.Count > 0
? JsonConvert.SerializeObject(record.Issues, jsonSettings)
: null,
EtlIngestDate = DateTime.UtcNow,
}, jsonSettings));
}
await blobClient.UploadAsync(new BinaryData(builder.ToString()));
}
catch (RequestFailedException ex) when (ex.Status == (int)HttpStatusCode.Conflict)
{
this.logger.LogInformation("Ignoring exception from existing timeline blob for build {BuildId}, change {ChangeId}", build.Id, timeline.ChangeId);
}
catch (Exception ex)
{
this.logger.LogError(ex, "Error processing timeline for build {BuildId}", build.Id);
throw;
}
}
private async Task UploadLogLinesBlobAsync(string account, Build build, BuildLogInfo log)
{
try
{
// we don't use FinishTime in the logs blob path to prevent duplicating logs when processing retries.
// i.e. logs with a given buildid/logid are immutable and retries only add new logs.
string blobPath = $"{build.Project.Name}/{build.QueueTime:yyyy/MM/dd}/{build.Id}-{log.LogId}.jsonl".ToLower();
BlobClient blobClient = this.buildLogLinesContainerClient.GetBlobClient(blobPath);
if (await blobClient.ExistsAsync())
{
this.logger.LogInformation("Skipping existing log for build {BuildId}, record {RecordId}, log {LogId}", build.Id, log.RecordId, log.LogId);
return;
}
this.logger.LogInformation("Processing log for build {BuildId}, record {RecordId}, log {LogId}", build.Id, log.RecordId, log.LogId);
int lineNumber = 0;
int characterCount = 0;
// Over an open read stream and an open write stream, one line at a time, read, process, and write to
// blob storage
using (Stream logStream = await this.buildClient.GetBuildLogAsync(build.Project.Name, build.Id, log.LogId))
using (StreamReader logReader = new(logStream))
using (Stream blobStream = await blobClient.OpenWriteAsync(overwrite: true, new BlobOpenWriteOptions()))
using (StreamWriter blobWriter = new(blobStream))
{
DateTimeOffset lastTimeStamp = log.LogCreatedOn ?? build.StartTime!.Value;
while (true)
{
string line = await logReader.ReadLineAsync();
if (line == null)
{
break;
}
lineNumber += 1;
characterCount += line.Length;
var (timestamp, message) = StringUtilities.ParseLogLine(line, lastTimeStamp);
lastTimeStamp = timestamp;
await blobWriter.WriteLineAsync(JsonConvert.SerializeObject(new
{
OrganizationName = account,
ProjectId = build.Project.Id,
ProjectName = build.Project.Name,
BuildDefinitionId = build.Definition.Id,
BuildDefinitionPath = build.Definition.Path,
BuildDefinitionName = build.Definition.Name,
BuildId = build.Id,
LogId = log.LogId,
LineNumber = lineNumber,
Length = message.Length,
Timestamp = timestamp.ToString(TimeFormat),
Message = message,
EtlIngestDate = DateTime.UtcNow.ToString(TimeFormat),
}, jsonSettings));
}
}
this.logger.LogInformation("Processed {CharacterCount} characters and {LineCount} lines for build {BuildId}, record {RecordId}, log {LogId}", characterCount, lineNumber, build.Id, log.RecordId, log.LogId);
}
catch (RequestFailedException ex) when (ex.Status == (int)HttpStatusCode.Conflict)
{
this.logger.LogInformation("Ignoring existing blob exception for build {BuildId}, record {RecordId}, log {LogId}", build.Id, log.RecordId, log.LogId);
}
catch (Exception ex)
{
this.logger.LogError(ex, "Error processing build {BuildId}, record {RecordId}, log {LogId}", build.Id, log.RecordId, log.LogId);
throw;
}
}
private async Task UploadTestRunBlobsAsync(string account, Build build)
{
try
{
string continuationToken = string.Empty;
DateTime minLastUpdatedDate = build.QueueTime!.Value.AddHours(-1);
DateTime maxLastUpdatedDate = build.FinishTime!.Value.AddHours(1);
DateTime rangeStart = minLastUpdatedDate;
while (rangeStart < maxLastUpdatedDate)
{
// Ado limits test run queries to a 7 day range, so we'll chunk on 6 days.
DateTime rangeEnd = rangeStart.AddDays(6);
if (rangeEnd > maxLastUpdatedDate)
{
rangeEnd = maxLastUpdatedDate;
}
do
{
IPagedList<TestRun> page = await this.testResultsClient.QueryTestRunsAsync2(
build.Project.Id,
rangeStart,
rangeEnd,
continuationToken: continuationToken,
buildIds: [ build.Id ]
);
foreach (TestRun testRun in page)
{
await UploadTestRunBlobAsync(account, build, testRun);
await UploadTestRunResultBlobAsync(account, build, testRun);
}
continuationToken = page.ContinuationToken;
} while (!string.IsNullOrEmpty(continuationToken));
rangeStart = rangeEnd;
}
}
catch (Exception ex)
{
this.logger.LogError(ex, "Error processing test runs for build {BuildId}", build.Id);
throw;
}
}
private async Task UploadTestRunBlobAsync(string account, Build build, TestRun testRun)
{
try
{
string blobPath = $"{build.Project.Name}/{testRun.CompletedDate:yyyy/MM/dd}/{testRun.Id}.jsonl".ToLower();
BlobClient blobClient = this.testRunsContainerClient.GetBlobClient(blobPath);
if (await blobClient.ExistsAsync())
{
this.logger.LogInformation("Skipping existing test run blob for build {BuildId}, test run {RunId}", build.Id, testRun.Id);
return;
}
Dictionary<string, int> stats = testRun.RunStatistics.ToDictionary(x => x.Outcome, x => x.Count);
string content = JsonConvert.SerializeObject(new
{
OrganizationName = account,
ProjectId = build.Project?.Id,
ProjectName = build.Project?.Name,
BuildDefinitionId = build.Definition?.Id,
BuildDefinitionPath = build.Definition?.Path,
BuildDefinitionName = build.Definition?.Name,
BuildId = build.Id,
TestRunId = testRun.Id,
Title = testRun.Name,
StartedDate = testRun.StartedDate,
CompletedDate = testRun.CompletedDate,
ResultDurationSeconds = 0,
RunDurationSeconds = (int)testRun.CompletedDate.Subtract(testRun.StartedDate).TotalSeconds,
BranchName = build.SourceBranch,
HasDetail = default(bool?),
IsAutomated = testRun.IsAutomated,
ResultAbortedCount = stats.TryGetValue("Aborted", out int value) ? value : 0,
ResultBlockedCount = stats.TryGetValue("Blocked", out value) ? value : 0,
ResultCount = testRun.TotalTests,
ResultErrorCount = stats.TryGetValue("Error", out value) ? value : 0,
ResultFailCount = stats.TryGetValue("Failed", out value) ? value : 0,
ResultInconclusiveCount = stats.TryGetValue("Inconclusive", out value) ? value : 0,
ResultNoneCount = stats.TryGetValue("None", out value) ? value : 0,
ResultNotApplicableCount = stats.TryGetValue("NotApplicable", out value) ? value : 0,
ResultNotExecutedCount = stats.TryGetValue("NotExecuted", out value) ? value : 0,
ResultNotImpactedCount = stats.TryGetValue("NotImpacted", out value) ? value : 0,
ResultPassCount = stats.TryGetValue("Passed", out value) ? value : 0,
ResultTimeoutCount = stats.TryGetValue("Timeout", out value) ? value : 0,
ResultWarningCount = stats.TryGetValue("Warning", out value) ? value : 0,
TestRunType = testRun.IsAutomated ? "Automated" : "Manual",
Workflow = !string.IsNullOrEmpty(testRun.Build?.Id) || testRun.BuildConfiguration != null ? "Build"
: testRun.Release?.Id > 0 ? "Release"
: "",
OrganizationId = default(string),
EtlIngestDate = DateTime.UtcNow,
}, jsonSettings);
await blobClient.UploadAsync(new BinaryData(content));
}
catch (RequestFailedException ex) when (ex.Status == (int)HttpStatusCode.Conflict)
{
this.logger.LogInformation("Ignoring exception from existing blob for test run {RunId} for build {BuildId}", testRun.Id, build.Id);
}
catch (Exception ex)
{
this.logger.LogError(ex, "Error processing test run blob for build {BuildId}, test run {RunId}", build.Id, testRun.Id);
throw;
}
}
public static int CalculateBatches(int startingNumber, int batchSize = ApiBatchSize)
{
if (startingNumber == 0)
{
return 0;
}
if (startingNumber <= batchSize)
{
return 1;
}
else
{
return (int)Math.Ceiling((double)(startingNumber) / batchSize);
}
}
private async Task UploadTestRunResultBlobAsync(string account, Build build, TestRun testRun)
{
try
{
string blobPath = $"{build.Project.Name}/{testRun.CompletedDate:yyyy/MM/dd}/{testRun.Id}.jsonl".ToLower();
BlobClient blobClient = this.testResultsContainerClient.GetBlobClient(blobPath);
if (await blobClient.ExistsAsync())
{
this.logger.LogInformation("Skipping existing test results blob for build {BuildId}, test run {RunId}", build.Id, testRun.Id);
return;
}
StringBuilder builder = new();
int batchCount = AzurePipelinesProcessor.CalculateBatches(testRun.TotalTests, batchSize: ApiBatchSize);
for (int batchMultiplier = 0; batchMultiplier < batchCount; batchMultiplier++)
{
List<TestCaseResult> data = await this.testResultsClient.GetTestResultsAsync(build.Project!.Id, testRun.Id, top: ApiBatchSize, skip: batchMultiplier * ApiBatchSize);
foreach (TestCaseResult record in data)
{
builder.AppendLine(JsonConvert.SerializeObject(
new
{
OrganizationName = account,
ProjectId = build.Project?.Id,
ProjectName = build.Project?.Name,
BuildDefinitionId = build.Definition?.Id,
BuildDefinitionPath = build.Definition?.Path,
BuildDefinitionName = build.Definition?.Name,
BuildId = build.Id,
TestRunId = testRun.Id,
TestCaseId = record.Id,
TestCaseReferenceId = record.TestCaseReferenceId,
TestCaseTitle = record.TestCaseTitle,
Outcome = record.Outcome,
Priority = record.Priority,
AutomatedTestName = record.AutomatedTestName,
AutomatedTestStorageName = record.AutomatedTestStorage,
FailingSince = record.FailingSince,
FailureType = record.FailureType,
StartedDate = record.StartedDate,
CompletedDate = record.CompletedDate,
EtlIngestDate = DateTime.UtcNow
}, jsonSettings)
);
}
}
await blobClient.UploadAsync(new BinaryData(builder.ToString()));
}
catch (Exception ex)
{
this.logger.LogError(ex, "Error processing test results for build {BuildId}", build.Id);
throw;
}
}
private async Task<Build> GetBuildAsync(Guid projectId, int buildId)
{
Build build = null;
try
{
build = await this.buildClient.GetBuildAsync(projectId, buildId);
}
catch (BuildNotFoundException)
{
}
return build;
}
internal async Task AddAdditionalBuildTagsAsync(string account, Guid projectId, int buildId)
{
this.logger.LogInformation("Processing build {BuildId} for AdditionalTags attachments", buildId);
const string attachmentType = "AdditionalTags";
HashSet<string> additionalTags = [];
List<Attachment> attachments = await this.buildClient.GetAttachmentsAsync(projectId, buildId, attachmentType);
IEnumerable<string> attachmentLinks = attachments
.Select(x => x.Links?.Links?.TryGetValue("self", out var value) == true ? value : null)
.OfType<ReferenceLink>()
.Select(x => x.Href)
.Where(x => !string.IsNullOrEmpty(x));
foreach (string attachmentLink in attachmentLinks)
{
Match match = Regex.Match(attachmentLink, @"https://dev.azure.com/[\w-]+/[\w-]+/_apis/build/builds/\d+/(?<timelineId>[\w-]+)/(?<recordId>[\w-]+)/attachments/[\w\.]+/(?<name>.+)");
if (!match.Success ||
!Guid.TryParse(match.Groups["timelineId"].Value, out Guid timelineId) ||
!Guid.TryParse(match.Groups["recordId"].Value, out Guid recordId))
{
// retries won't help here, so we log and continue
this.logger.LogWarning("Unable to parse attachment link {AttachmentLink}", attachmentLink);
continue;
}
string name = match.Groups["name"].Value;
this.logger.LogInformation("Downloading AdditionalTags attachment {TimelineId}/{RecordId}/{Name} for build {BuildId}", timelineId, recordId, name, buildId);
using Stream contentStream = await this.buildClient.GetAttachmentAsync(
projectId,
buildId,
timelineId,
recordId,
attachmentType,
match.Groups["name"].Value);
using StreamReader reader = new (contentStream);
var content = reader.ReadToEnd();
try
{
string[] tags = JsonConvert.DeserializeObject<string[]>(content);
additionalTags.UnionWith(tags);
}
catch(Exception ex)
{
// retries won't help here, so we log and continue
this.logger.LogError(ex, "Error parsing AdditionalTags attachment {TimelineId}/{RecordId}/{Name} for build {BuildId}", timelineId, recordId, name, buildId);
continue;
}
}
if (additionalTags.Count != 0)
{
this.logger.LogInformation("Adding tags {Tags} to build {BuildId}", JsonConvert.SerializeObject(additionalTags), buildId);
await this.buildClient.AddBuildTagsAsync(additionalTags, projectId, buildId);
}
}
}
}