in src/Agent.Listener/Agent.cs [291:512]
private async Task<int> RunAsync(AgentSettings settings, bool runOnce = false)
{
try
{
Trace.Info(nameof(RunAsync));
_listener = HostContext.GetService<IMessageListener>();
if (!await _listener.CreateSessionAsync(HostContext.AgentShutdownToken))
{
return Constants.Agent.ReturnCode.TerminatedError;
}
HostContext.WritePerfCounter("SessionCreated");
_term.WriteLine(StringUtil.Loc("ListenForJobs", DateTime.UtcNow));
IJobDispatcher jobDispatcher = null;
CancellationTokenSource messageQueueLoopTokenSource = CancellationTokenSource.CreateLinkedTokenSource(HostContext.AgentShutdownToken);
try
{
var notification = HostContext.GetService<IJobNotification>();
if (!String.IsNullOrEmpty(settings.NotificationSocketAddress))
{
notification.StartClient(settings.NotificationSocketAddress, settings.MonitorSocketAddress);
}
else
{
notification.StartClient(settings.NotificationPipeName, settings.MonitorSocketAddress, HostContext.AgentShutdownToken);
}
// this is not a reliable way to disable auto update.
// we need server side work to really enable the feature
// https://github.com/Microsoft/vsts-agent/issues/446 (Feature: Allow agent / pool to opt out of automatic updates)
bool disableAutoUpdate = !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("agent.disableupdate"));
bool autoUpdateInProgress = false;
Task<bool> selfUpdateTask = null;
bool runOnceJobReceived = false;
jobDispatcher = HostContext.CreateService<IJobDispatcher>();
while (!HostContext.AgentShutdownToken.IsCancellationRequested)
{
TaskAgentMessage message = null;
bool skipMessageDeletion = false;
try
{
Task<TaskAgentMessage> getNextMessage = _listener.GetNextMessageAsync(messageQueueLoopTokenSource.Token);
if (autoUpdateInProgress)
{
Trace.Verbose("Auto update task running at backend, waiting for getNextMessage or selfUpdateTask to finish.");
Task completeTask = await Task.WhenAny(getNextMessage, selfUpdateTask);
if (completeTask == selfUpdateTask)
{
autoUpdateInProgress = false;
if (await selfUpdateTask)
{
Trace.Info("Auto update task finished at backend, an agent update is ready to apply exit the current agent instance.");
Trace.Info("Stop message queue looping.");
messageQueueLoopTokenSource.Cancel();
try
{
await getNextMessage;
}
catch (Exception ex)
{
Trace.Info($"Ignore any exception after cancel message loop. {ex}");
}
if (runOnce)
{
return Constants.Agent.ReturnCode.RunOnceAgentUpdating;
}
else
{
return Constants.Agent.ReturnCode.AgentUpdating;
}
}
else
{
Trace.Info("Auto update task finished at backend, there is no available agent update needs to apply, continue message queue looping.");
}
}
}
if (runOnceJobReceived)
{
Trace.Verbose("One time used agent has start running its job, waiting for getNextMessage or the job to finish.");
Task completeTask = await Task.WhenAny(getNextMessage, jobDispatcher.RunOnceJobCompleted.Task);
if (completeTask == jobDispatcher.RunOnceJobCompleted.Task)
{
Trace.Info("Job has finished at backend, the agent will exit since it is running under onetime use mode.");
Trace.Info("Stop message queue looping.");
messageQueueLoopTokenSource.Cancel();
try
{
await getNextMessage;
}
catch (Exception ex)
{
Trace.Info($"Ignore any exception after cancel message loop. {ex}");
}
return Constants.Agent.ReturnCode.Success;
}
}
message = await getNextMessage; //get next message
HostContext.WritePerfCounter($"MessageReceived_{message.MessageType}");
if (string.Equals(message.MessageType, AgentRefreshMessage.MessageType, StringComparison.OrdinalIgnoreCase))
{
if (disableAutoUpdate)
{
Trace.Info("Refresh message received, skip autoupdate since environment variable agent.disableupdate is set.");
}
else
{
if (autoUpdateInProgress == false)
{
autoUpdateInProgress = true;
var agentUpdateMessage = JsonUtility.FromString<AgentRefreshMessage>(message.Body);
var selfUpdater = HostContext.GetService<ISelfUpdater>();
selfUpdateTask = selfUpdater.SelfUpdate(agentUpdateMessage, jobDispatcher, !runOnce && HostContext.StartupType != StartupType.Service, HostContext.AgentShutdownToken);
Trace.Info("Refresh message received, kick-off selfupdate background process.");
}
else
{
Trace.Info("Refresh message received, skip autoupdate since a previous autoupdate is already running.");
}
}
}
else if (string.Equals(message.MessageType, JobRequestMessageTypes.AgentJobRequest, StringComparison.OrdinalIgnoreCase) ||
string.Equals(message.MessageType, JobRequestMessageTypes.PipelineAgentJobRequest, StringComparison.OrdinalIgnoreCase))
{
if (autoUpdateInProgress || runOnceJobReceived)
{
skipMessageDeletion = true;
Trace.Info($"Skip message deletion for job request message '{message.MessageId}'.");
}
else
{
Pipelines.AgentJobRequestMessage pipelineJobMessage = null;
switch (message.MessageType)
{
case JobRequestMessageTypes.AgentJobRequest:
var legacyJobMessage = JsonUtility.FromString<AgentJobRequestMessage>(message.Body);
pipelineJobMessage = Pipelines.AgentJobRequestMessageUtil.Convert(legacyJobMessage);
break;
case JobRequestMessageTypes.PipelineAgentJobRequest:
pipelineJobMessage = JsonUtility.FromString<Pipelines.AgentJobRequestMessage>(message.Body);
break;
}
jobDispatcher.Run(pipelineJobMessage, runOnce);
if (runOnce)
{
Trace.Info("One time used agent received job message.");
runOnceJobReceived = true;
}
}
}
else if (string.Equals(message.MessageType, JobCancelMessage.MessageType, StringComparison.OrdinalIgnoreCase))
{
var cancelJobMessage = JsonUtility.FromString<JobCancelMessage>(message.Body);
bool jobCancelled = jobDispatcher.Cancel(cancelJobMessage);
skipMessageDeletion = (autoUpdateInProgress || runOnceJobReceived) && !jobCancelled;
if (skipMessageDeletion)
{
Trace.Info($"Skip message deletion for cancellation message '{message.MessageId}'.");
}
}
else if (string.Equals(message.MessageType, JobMetadataMessage.MessageType, StringComparison.OrdinalIgnoreCase))
{
var metadataMessage = JsonUtility.FromString<JobMetadataMessage>(message.Body);
jobDispatcher.MetadataUpdate(metadataMessage);
}
else
{
Trace.Error($"Received message {message.MessageId} with unsupported message type {message.MessageType}.");
}
}
catch (AggregateException e)
{
ExceptionsUtil.HandleAggregateException((AggregateException)e, Trace.Error);
}
finally
{
if (!skipMessageDeletion && message != null)
{
try
{
await _listener.DeleteMessageAsync(message);
}
catch (Exception ex)
{
Trace.Error($"Catch exception during delete message from message queue. message id: {message.MessageId}");
Trace.Error(ex);
}
finally
{
message = null;
}
}
}
}
}
finally
{
if (jobDispatcher != null)
{
await jobDispatcher.ShutdownAsync();
}
//TODO: make sure we don't mask more important exception
await _listener.DeleteSessionAsync();
messageQueueLoopTokenSource.Dispose();
}
}
catch (TaskAgentAccessTokenExpiredException)
{
Trace.Info("Agent OAuth token has been revoked. Shutting down.");
}
return Constants.Agent.ReturnCode.Success;
}