in src/Agent.Worker/JobRunner.cs [43:347]
public async Task<TaskResult> RunAsync(Pipelines.AgentJobRequestMessage message, CancellationToken jobRequestCancellationToken)
{
// Validate parameters.
Trace.Entering();
ArgUtil.NotNull(message, nameof(message));
ArgUtil.NotNull(message.Resources, nameof(message.Resources));
ArgUtil.NotNull(message.Variables, nameof(message.Variables));
ArgUtil.NotNull(message.Steps, nameof(message.Steps));
Trace.Info("Job ID {0}", message.JobId);
DateTime jobStartTimeUtc = DateTime.UtcNow;
ServiceEndpoint systemConnection = message.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));
// System.AccessToken
if (message.Variables.ContainsKey(Constants.Variables.System.EnableAccessToken) &&
StringUtil.ConvertToBoolean(message.Variables[Constants.Variables.System.EnableAccessToken].Value))
{
message.Variables[Constants.Variables.System.AccessToken] = new VariableValue(systemConnection.Authorization.Parameters["AccessToken"], false);
}
// back compat TfsServerUrl
message.Variables[Constants.Variables.System.TFServerUrl] = systemConnection.Url.AbsoluteUri;
// Make sure SystemConnection Url and Endpoint Url match Config Url base for OnPremises server
// System.ServerType will always be there after M133
if (!message.Variables.ContainsKey(Constants.Variables.System.ServerType) ||
string.Equals(message.Variables[Constants.Variables.System.ServerType]?.Value, "OnPremises", StringComparison.OrdinalIgnoreCase))
{
ReplaceConfigUriBaseInJobRequestMessage(message);
}
// Setup the job server and job server queue.
var jobServer = HostContext.GetService<IJobServer>();
VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection);
Uri jobServerUrl = systemConnection.Url;
Trace.Info($"Creating job server with URL: {jobServerUrl}");
// jobServerQueue is the throttling reporter.
_jobServerQueue = HostContext.GetService<IJobServerQueue>();
VssConnection jobConnection = VssUtil.CreateConnection(
jobServerUrl,
jobServerCredential,
trace: Trace,
new DelegatingHandler[] { new ThrottlingReportHandler(_jobServerQueue) }
);
await jobServer.ConnectAsync(jobConnection);
_jobServerQueue.Start(message);
HostContext.WritePerfCounter($"WorkerJobServerQueueStarted_{message.RequestId.ToString()}");
IExecutionContext jobContext = null;
CancellationTokenRegistration? agentShutdownRegistration = null;
try
{
// Create the job execution context.
jobContext = HostContext.CreateService<IExecutionContext>();
jobContext.InitializeJob(message, jobRequestCancellationToken);
Trace.Info("Starting the job execution context.");
jobContext.Start();
jobContext.Section(StringUtil.Loc("StepStarting", message.JobDisplayName));
agentShutdownRegistration = HostContext.AgentShutdownToken.Register(() =>
{
// log an issue, then agent get shutdown by Ctrl-C or Ctrl-Break.
// the server will use Ctrl-Break to tells the agent that operating system is shutting down.
string errorMessage;
switch (HostContext.AgentShutdownReason)
{
case ShutdownReason.UserCancelled:
errorMessage = StringUtil.Loc("UserShutdownAgent");
break;
case ShutdownReason.OperatingSystemShutdown:
errorMessage = StringUtil.Loc("OperatingSystemShutdown", Environment.MachineName);
break;
default:
throw new ArgumentException(HostContext.AgentShutdownReason.ToString(), nameof(HostContext.AgentShutdownReason));
}
jobContext.AddIssue(new Issue() { Type = IssueType.Error, Message = errorMessage });
});
// Validate directory permissions.
string workDirectory = HostContext.GetDirectory(WellKnownDirectory.Work);
Trace.Info($"Validating directory permissions for: '{workDirectory}'");
try
{
Directory.CreateDirectory(workDirectory);
IOUtil.ValidateExecutePermission(workDirectory);
}
catch (Exception ex)
{
Trace.Error(ex);
jobContext.Error(ex);
return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Failed);
}
// Set agent variables.
AgentSettings settings = HostContext.GetService<IConfigurationStore>().GetSettings();
jobContext.SetVariable(Constants.Variables.Agent.Id, settings.AgentId.ToString(CultureInfo.InvariantCulture));
jobContext.SetVariable(Constants.Variables.Agent.HomeDirectory, HostContext.GetDirectory(WellKnownDirectory.Root), isFilePath: true);
jobContext.SetVariable(Constants.Variables.Agent.JobName, message.JobDisplayName);
jobContext.SetVariable(Constants.Variables.Agent.MachineName, Environment.MachineName);
jobContext.SetVariable(Constants.Variables.Agent.Name, settings.AgentName);
jobContext.SetVariable(Constants.Variables.Agent.OS, VarUtil.OS);
jobContext.SetVariable(Constants.Variables.Agent.OSArchitecture, VarUtil.OSArchitecture);
jobContext.SetVariable(Constants.Variables.Agent.RootDirectory, HostContext.GetDirectory(WellKnownDirectory.Work), isFilePath: true);
if (PlatformUtil.RunningOnWindows)
{
jobContext.SetVariable(Constants.Variables.Agent.ServerOMDirectory, HostContext.GetDirectory(WellKnownDirectory.ServerOM), isFilePath: true);
}
if (!PlatformUtil.RunningOnWindows)
{
jobContext.SetVariable(Constants.Variables.Agent.AcceptTeeEula, settings.AcceptTeeEula.ToString());
}
jobContext.SetVariable(Constants.Variables.Agent.WorkFolder, HostContext.GetDirectory(WellKnownDirectory.Work), isFilePath: true);
jobContext.SetVariable(Constants.Variables.System.WorkFolder, HostContext.GetDirectory(WellKnownDirectory.Work), isFilePath: true);
string toolsDirectory = HostContext.GetDirectory(WellKnownDirectory.Tools);
Directory.CreateDirectory(toolsDirectory);
jobContext.SetVariable(Constants.Variables.Agent.ToolsDirectory, toolsDirectory, isFilePath: true);
if (AgentKnobs.DisableGitPrompt.GetValue(jobContext).AsBoolean())
{
jobContext.SetVariable("GIT_TERMINAL_PROMPT", "0");
}
// Setup TEMP directories
_tempDirectoryManager = HostContext.GetService<ITempDirectoryManager>();
_tempDirectoryManager.InitializeTempDirectory(jobContext);
// todo: task server can throw. try/catch and fail job gracefully.
// prefer task definitions url, then TFS collection url, then TFS account url
var taskServer = HostContext.GetService<ITaskServer>();
Uri taskServerUri = null;
if (!string.IsNullOrEmpty(jobContext.Variables.System_TaskDefinitionsUri))
{
taskServerUri = new Uri(jobContext.Variables.System_TaskDefinitionsUri);
}
else if (!string.IsNullOrEmpty(jobContext.Variables.System_TFCollectionUrl))
{
taskServerUri = new Uri(jobContext.Variables.System_TFCollectionUrl);
}
var taskServerCredential = VssUtil.GetVssCredential(systemConnection);
if (taskServerUri != null)
{
Trace.Info($"Creating task server with {taskServerUri}");
await taskServer.ConnectAsync(VssUtil.CreateConnection(taskServerUri, taskServerCredential, trace: Trace));
}
// for back compat TFS 2015 RTM/QU1, we may need to switch the task server url to agent config url
if (!string.Equals(message?.Variables.GetValueOrDefault(Constants.Variables.System.ServerType)?.Value, "Hosted", StringComparison.OrdinalIgnoreCase))
{
if (taskServerUri == null || !await taskServer.TaskDefinitionEndpointExist())
{
Trace.Info($"Can't determine task download url from JobMessage or the endpoint doesn't exist.");
var configStore = HostContext.GetService<IConfigurationStore>();
taskServerUri = new Uri(configStore.GetSettings().ServerUrl);
Trace.Info($"Recreate task server with configuration server url: {taskServerUri}");
await taskServer.ConnectAsync(VssUtil.CreateConnection(taskServerUri, taskServerCredential, trace: Trace));
}
}
// Expand the endpoint data values.
foreach (ServiceEndpoint endpoint in jobContext.Endpoints)
{
jobContext.Variables.ExpandValues(target: endpoint.Data);
VarUtil.ExpandEnvironmentVariables(HostContext, target: endpoint.Data);
}
// Expand the repository property values.
foreach (var repository in jobContext.Repositories)
{
// expand checkout option
var checkoutOptions = repository.Properties.Get<JToken>(Pipelines.RepositoryPropertyNames.CheckoutOptions);
if (checkoutOptions != null)
{
checkoutOptions = jobContext.Variables.ExpandValues(target: checkoutOptions);
checkoutOptions = VarUtil.ExpandEnvironmentVariables(HostContext, target: checkoutOptions);
repository.Properties.Set<JToken>(Pipelines.RepositoryPropertyNames.CheckoutOptions, checkoutOptions); ;
}
// expand workspace mapping
var mappings = repository.Properties.Get<JToken>(Pipelines.RepositoryPropertyNames.Mappings);
if (mappings != null)
{
mappings = jobContext.Variables.ExpandValues(target: mappings);
mappings = VarUtil.ExpandEnvironmentVariables(HostContext, target: mappings);
repository.Properties.Set<JToken>(Pipelines.RepositoryPropertyNames.Mappings, mappings);
}
}
// Expand container properties
foreach (var container in jobContext.Containers)
{
this.ExpandProperties(container, jobContext.Variables);
}
foreach (var sidecar in jobContext.SidecarContainers)
{
this.ExpandProperties(sidecar, jobContext.Variables);
}
// Get the job extension.
Trace.Info("Getting job extension.");
var hostType = jobContext.Variables.System_HostType;
var extensionManager = HostContext.GetService<IExtensionManager>();
// We should always have one job extension
IJobExtension jobExtension =
(extensionManager.GetExtensions<IJobExtension>() ?? new List<IJobExtension>())
.Where(x => x.HostType.HasFlag(hostType))
.FirstOrDefault();
ArgUtil.NotNull(jobExtension, nameof(jobExtension));
List<IStep> jobSteps = null;
try
{
Trace.Info("Initialize job. Getting all job steps.");
jobSteps = await jobExtension.InitializeJob(jobContext, message);
}
catch (OperationCanceledException ex) when (jobContext.CancellationToken.IsCancellationRequested)
{
// set the job to canceled
// don't log error issue to job ExecutionContext, since server owns the job level issue
Trace.Error($"Job is canceled during initialize.");
Trace.Error($"Caught exception: {ex}");
return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Canceled);
}
catch (Exception ex)
{
// set the job to failed.
// don't log error issue to job ExecutionContext, since server owns the job level issue
Trace.Error($"Job initialize failed.");
Trace.Error($"Caught exception from {nameof(jobExtension.InitializeJob)}: {ex}");
return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Failed);
}
// trace out all steps
Trace.Info($"Total job steps: {jobSteps.Count}.");
Trace.Verbose($"Job steps: '{string.Join(", ", jobSteps.Select(x => x.DisplayName))}'");
HostContext.WritePerfCounter($"WorkerJobInitialized_{message?.RequestId.ToString()}");
// Run all job steps
Trace.Info("Run all job steps.");
var stepsRunner = HostContext.GetService<IStepsRunner>();
try
{
await stepsRunner.RunAsync(jobContext, jobSteps);
}
catch (Exception ex)
{
// StepRunner should never throw exception out.
// End up here mean there is a bug in StepRunner
// Log the error and fail the job.
Trace.Error($"Caught exception from job steps {nameof(StepsRunner)}: {ex}");
jobContext.Error(ex);
return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Failed);
}
finally
{
Trace.Info("Finalize job.");
await jobExtension.FinalizeJob(jobContext);
}
Trace.Info($"Job result after all job steps finish: {jobContext.Result ?? TaskResult.Succeeded}");
if (jobContext.Variables.GetBoolean(Constants.Variables.Agent.Diagnostic) ?? false)
{
Trace.Info("Support log upload starting.");
IDiagnosticLogManager diagnosticLogManager = HostContext.GetService<IDiagnosticLogManager>();
try
{
await diagnosticLogManager.UploadDiagnosticLogsAsync(executionContext: jobContext, message: message, jobStartTimeUtc: jobStartTimeUtc);
Trace.Info("Support log upload complete.");
}
catch (Exception ex)
{
// Log the error but make sure we continue gracefully.
Trace.Info("Error uploading support logs.");
Trace.Error(ex);
}
}
Trace.Info("Completing the job execution context.");
return await CompleteJobAsync(jobServer, jobContext, message);
}
catch (AggregateException e)
{
ExceptionsUtil.HandleAggregateException((AggregateException)e, Trace.Error);
return TaskResult.Failed;
}
finally
{
if (agentShutdownRegistration != null)
{
agentShutdownRegistration.Value.Dispose();
agentShutdownRegistration = null;
}
await ShutdownQueue(throwOnFailure: false);
}
}