in src/Agent.Listener/JobDispatcher.cs [325:678]
private async Task RunAsync(Pipelines.AgentJobRequestMessage message, WorkerDispatcher previousJobDispatch, WorkerDispatcher newJobDispatch)
{
if (previousJobDispatch != null)
{
Trace.Verbose($"Make sure the previous job request {previousJobDispatch.JobId} has successfully finished on worker.");
await EnsureDispatchFinished(previousJobDispatch);
}
else
{
Trace.Verbose($"This is the first job request.");
}
var jobRequestCancellationToken = newJobDispatch.WorkerCancellationTokenSource.Token;
var workerCancelTimeoutKillToken = newJobDispatch.WorkerCancelTimeoutKillTokenSource.Token;
var term = HostContext.GetService<ITerminal>();
term.WriteLine(StringUtil.Loc("RunningJob", DateTime.UtcNow, message.JobDisplayName));
// first job request renew succeed.
TaskCompletionSource<int> firstJobRequestRenewed = new TaskCompletionSource<int>();
var notification = HostContext.GetService<IJobNotification>();
// lock renew cancellation token.
using (var lockRenewalTokenSource = new CancellationTokenSource())
using (var workerProcessCancelTokenSource = new CancellationTokenSource())
{
long requestId = message.RequestId;
Guid lockToken = Guid.Empty; // lockToken has never been used, keep this here of compat
// Because an agent can be idle for a long time between jobs, it is possible that in that time
// a firewall has closed the connection. For that reason, forcibly reestablish this connection at the
// start of a new job
var agentServer = HostContext.GetService<IAgentServer>();
await agentServer.RefreshConnectionAsync(AgentConnectionType.JobRequest, TimeSpan.FromSeconds(30));
// start renew job request
Trace.Info($"Start renew job request {requestId} for job {message.JobId}.");
Task renewJobRequest = RenewJobRequestAsync(_poolId, requestId, lockToken, firstJobRequestRenewed, lockRenewalTokenSource.Token);
// wait till first renew succeed or job request is canceled
// not even start worker if the first renew fail
await Task.WhenAny(firstJobRequestRenewed.Task, renewJobRequest, Task.Delay(-1, jobRequestCancellationToken));
if (renewJobRequest.IsCompleted)
{
// renew job request task complete means we run out of retry for the first job request renew.
Trace.Info($"Unable to renew job request for job {message.JobId} for the first time, stop dispatching job to worker.");
return;
}
if (jobRequestCancellationToken.IsCancellationRequested)
{
Trace.Info($"Stop renew job request for job {message.JobId}.");
// stop renew lock
lockRenewalTokenSource.Cancel();
// renew job request should never blows up.
await renewJobRequest;
// complete job request with result Cancelled
await CompleteJobRequestAsync(_poolId, message, lockToken, TaskResult.Canceled);
return;
}
HostContext.WritePerfCounter($"JobRequestRenewed_{requestId.ToString()}");
Task<int> workerProcessTask = null;
object _outputLock = new object();
List<string> workerOutput = new List<string>();
using (var processChannel = HostContext.CreateService<IProcessChannel>())
using (var processInvoker = HostContext.CreateService<IProcessInvoker>())
{
// Start the process channel.
// It's OK if StartServer bubbles an execption after the worker process has already started.
// The worker will shutdown after 30 seconds if it hasn't received the job message.
processChannel.StartServer(
// Delegate to start the child process.
startProcess: (string pipeHandleOut, string pipeHandleIn) =>
{
// Validate args.
ArgUtil.NotNullOrEmpty(pipeHandleOut, nameof(pipeHandleOut));
ArgUtil.NotNullOrEmpty(pipeHandleIn, nameof(pipeHandleIn));
// Save STDOUT from worker, worker will use STDOUT report unhandle exception.
processInvoker.OutputDataReceived += delegate (object sender, ProcessDataReceivedEventArgs stdout)
{
if (!string.IsNullOrEmpty(stdout.Data))
{
lock (_outputLock)
{
workerOutput.Add(stdout.Data);
}
}
};
// Save STDERR from worker, worker will use STDERR on crash.
processInvoker.ErrorDataReceived += delegate (object sender, ProcessDataReceivedEventArgs stderr)
{
if (!string.IsNullOrEmpty(stderr.Data))
{
lock (_outputLock)
{
workerOutput.Add(stderr.Data);
}
}
};
// Start the child process.
HostContext.WritePerfCounter("StartingWorkerProcess");
var assemblyDirectory = HostContext.GetDirectory(WellKnownDirectory.Bin);
string workerFileName = Path.Combine(assemblyDirectory, _workerProcessName);
workerProcessTask = processInvoker.ExecuteAsync(
workingDirectory: assemblyDirectory,
fileName: workerFileName,
arguments: "spawnclient " + pipeHandleOut + " " + pipeHandleIn,
environment: null,
requireExitCodeZero: false,
outputEncoding: null,
killProcessOnCancel: true,
redirectStandardIn: null,
inheritConsoleHandler: false,
keepStandardInOpen: false,
highPriorityProcess: true,
cancellationToken: workerProcessCancelTokenSource.Token);
}
);
// Send the job request message.
// Kill the worker process if sending the job message times out. The worker
// process may have successfully received the job message.
try
{
var body = JsonUtility.ToString(message);
var numBytes = System.Text.ASCIIEncoding.Unicode.GetByteCount(body) / 1024;
string numBytesString = numBytes > 0 ? $"{numBytes} KB" : " < 1 KB";
Trace.Info($"Send job request message to worker for job {message.JobId} ({numBytesString}).");
HostContext.WritePerfCounter($"AgentSendingJobToWorker_{message.JobId}");
var stopWatch = Stopwatch.StartNew();
using (var csSendJobRequest = new CancellationTokenSource(_channelTimeout))
{
await processChannel.SendAsync(
messageType: MessageType.NewJobRequest,
body: body,
cancellationToken: csSendJobRequest.Token);
}
stopWatch.Stop();
Trace.Info($"Took {stopWatch.ElapsedMilliseconds} ms to send job message to worker");
}
catch (OperationCanceledException)
{
// message send been cancelled.
// timeout 30 sec. kill worker.
Trace.Info($"Job request message sending for job {message.JobId} been cancelled after waiting for {_channelTimeout.TotalSeconds} seconds, kill running worker.");
workerProcessCancelTokenSource.Cancel();
try
{
await workerProcessTask;
}
catch (OperationCanceledException)
{
Trace.Info("worker process has been killed.");
}
Trace.Info($"Stop renew job request for job {message.JobId}.");
// stop renew lock
lockRenewalTokenSource.Cancel();
// renew job request should never blows up.
await renewJobRequest;
// not finish the job request since the job haven't run on worker at all, we will not going to set a result to server.
return;
}
// we get first jobrequest renew succeed and start the worker process with the job message.
// send notification to machine provisioner.
var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));
var accessToken = systemConnection?.Authorization?.Parameters["AccessToken"];
VariableValue identifier = null;
VariableValue definitionId = null;
if (message.Plan.PlanType == "Build")
{
message.Variables.TryGetValue("build.buildId", out identifier);
message.Variables.TryGetValue("system.definitionId", out definitionId);
}
else if (message.Plan.PlanType == "Release")
{
message.Variables.TryGetValue("release.deploymentId", out identifier);
message.Variables.TryGetValue("release.definitionId", out definitionId);
}
await notification.JobStarted(message.JobId, accessToken, systemConnection.Url, message.Plan.PlanId, (identifier?.Value ?? "0"), (definitionId?.Value ?? "0"), message.Plan.PlanType);
HostContext.WritePerfCounter($"SentJobToWorker_{requestId.ToString()}");
try
{
TaskResult resultOnAbandonOrCancel = TaskResult.Succeeded;
// wait for renewlock, worker process or cancellation token been fired.
// keep listening iff we receive a metadata update
bool keepListening = true;
while (keepListening)
{
var metadataUpdateTask = newJobDispatch.MetadataSource.Task;
var completedTask = await Task.WhenAny(renewJobRequest, workerProcessTask, Task.Delay(-1, jobRequestCancellationToken), metadataUpdateTask);
if (completedTask == workerProcessTask)
{
keepListening = false;
// worker finished successfully, complete job request with result, attach unhandled exception reported by worker, stop renew lock, job has finished.
int returnCode = await workerProcessTask;
Trace.Info($"Worker finished for job {message.JobId}. Code: " + returnCode);
string detailInfo = null;
if (!TaskResultUtil.IsValidReturnCode(returnCode))
{
detailInfo = string.Join(Environment.NewLine, workerOutput);
Trace.Info($"Return code {returnCode} indicate worker encounter an unhandled exception or app crash, attach worker stdout/stderr to JobRequest result.");
await LogWorkerProcessUnhandledException(message, detailInfo);
}
TaskResult result = TaskResultUtil.TranslateFromReturnCode(returnCode);
Trace.Info($"finish job request for job {message.JobId} with result: {result}");
term.WriteLine(StringUtil.Loc("JobCompleted", DateTime.UtcNow, message.JobDisplayName, result));
Trace.Info($"Stop renew job request for job {message.JobId}.");
// stop renew lock
lockRenewalTokenSource.Cancel();
// renew job request should never blows up.
await renewJobRequest;
// complete job request
await CompleteJobRequestAsync(_poolId, message, lockToken, result, detailInfo);
// print out unhandled exception happened in worker after we complete job request.
// when we run out of disk space, report back to server has higher priority.
if (!string.IsNullOrEmpty(detailInfo))
{
Trace.Error("Unhandled exception happened in worker:");
Trace.Error(detailInfo);
}
return;
}
else if (completedTask == renewJobRequest)
{
keepListening = false;
resultOnAbandonOrCancel = TaskResult.Abandoned;
}
else if (completedTask == metadataUpdateTask)
{
Trace.Info($"Send job metadata update message to worker for job {message.JobId}.");
using (var csSendCancel = new CancellationTokenSource(_channelTimeout))
{
var body = JsonUtility.ToString(metadataUpdateTask.Result);
await processChannel.SendAsync(
messageType: MessageType.JobMetadataUpdate,
body: body,
cancellationToken: csSendCancel.Token);
}
newJobDispatch.ResetMetadataSource();
}
else
{
keepListening = false;
resultOnAbandonOrCancel = TaskResult.Canceled;
}
}
// renew job request completed or job request cancellation token been fired for RunAsync(jobrequestmessage)
// cancel worker gracefully first, then kill it after worker cancel timeout
try
{
Trace.Info($"Send job cancellation message to worker for job {message.JobId}.");
using (var csSendCancel = new CancellationTokenSource(_channelTimeout))
{
var messageType = MessageType.CancelRequest;
if (HostContext.AgentShutdownToken.IsCancellationRequested)
{
switch (HostContext.AgentShutdownReason)
{
case ShutdownReason.UserCancelled:
messageType = MessageType.AgentShutdown;
break;
case ShutdownReason.OperatingSystemShutdown:
messageType = MessageType.OperatingSystemShutdown;
break;
}
}
await processChannel.SendAsync(
messageType: messageType,
body: string.Empty,
cancellationToken: csSendCancel.Token);
}
}
catch (OperationCanceledException)
{
// message send been cancelled.
Trace.Info($"Job cancel message sending for job {message.JobId} been cancelled, kill running worker.");
workerProcessCancelTokenSource.Cancel();
try
{
await workerProcessTask;
}
catch (OperationCanceledException)
{
Trace.Info("worker process has been killed.");
}
}
// wait worker to exit
// if worker doesn't exit within timeout, then kill worker.
var exitTask = await Task.WhenAny(workerProcessTask, Task.Delay(-1, workerCancelTimeoutKillToken));
// worker haven't exit within cancellation timeout.
if (exitTask != workerProcessTask)
{
Trace.Info($"worker process for job {message.JobId} haven't exit within cancellation timout, kill running worker.");
workerProcessCancelTokenSource.Cancel();
try
{
await workerProcessTask;
}
catch (OperationCanceledException)
{
Trace.Info("worker process has been killed.");
}
}
Trace.Info($"finish job request for job {message.JobId} with result: {resultOnAbandonOrCancel}");
term.WriteLine(StringUtil.Loc("JobCompleted", DateTime.UtcNow, message.JobDisplayName, resultOnAbandonOrCancel));
// complete job request with cancel result, stop renew lock, job has finished.
Trace.Info($"Stop renew job request for job {message.JobId}.");
// stop renew lock
lockRenewalTokenSource.Cancel();
// renew job request should never blows up.
await renewJobRequest;
// complete job request
await CompleteJobRequestAsync(_poolId, message, lockToken, resultOnAbandonOrCancel);
}
catch (AggregateException e)
{
ExceptionsUtil.HandleAggregateException((AggregateException)e, Trace.Error);
}
finally
{
// This should be the last thing to run so we don't notify external parties until actually finished
await notification.JobCompleted(message.JobId);
}
}
}
}