private async Task RunAsync()

in src/Agent.Listener/JobDispatcher.cs [325:678]


        private async Task RunAsync(Pipelines.AgentJobRequestMessage message, WorkerDispatcher previousJobDispatch, WorkerDispatcher newJobDispatch)
        {
            if (previousJobDispatch != null)
            {
                Trace.Verbose($"Make sure the previous job request {previousJobDispatch.JobId} has successfully finished on worker.");
                await EnsureDispatchFinished(previousJobDispatch);
            }
            else
            {
                Trace.Verbose($"This is the first job request.");
            }

            var jobRequestCancellationToken = newJobDispatch.WorkerCancellationTokenSource.Token;
            var workerCancelTimeoutKillToken = newJobDispatch.WorkerCancelTimeoutKillTokenSource.Token;

            var term = HostContext.GetService<ITerminal>();
            term.WriteLine(StringUtil.Loc("RunningJob", DateTime.UtcNow, message.JobDisplayName));

            // first job request renew succeed.
            TaskCompletionSource<int> firstJobRequestRenewed = new TaskCompletionSource<int>();
            var notification = HostContext.GetService<IJobNotification>();

            // lock renew cancellation token.
            using (var lockRenewalTokenSource = new CancellationTokenSource())
            using (var workerProcessCancelTokenSource = new CancellationTokenSource())
            {
                long requestId = message.RequestId;
                Guid lockToken = Guid.Empty; // lockToken has never been used, keep this here of compat
                // Because an agent can be idle for a long time between jobs, it is possible that in that time
                // a firewall has closed the connection. For that reason, forcibly reestablish this connection at the
                // start of a new job
                var agentServer = HostContext.GetService<IAgentServer>();
                await agentServer.RefreshConnectionAsync(AgentConnectionType.JobRequest, TimeSpan.FromSeconds(30));

                // start renew job request
                Trace.Info($"Start renew job request {requestId} for job {message.JobId}.");
                Task renewJobRequest = RenewJobRequestAsync(_poolId, requestId, lockToken, firstJobRequestRenewed, lockRenewalTokenSource.Token);

                // wait till first renew succeed or job request is canceled
                // not even start worker if the first renew fail
                await Task.WhenAny(firstJobRequestRenewed.Task, renewJobRequest, Task.Delay(-1, jobRequestCancellationToken));

                if (renewJobRequest.IsCompleted)
                {
                    // renew job request task complete means we run out of retry for the first job request renew.
                    Trace.Info($"Unable to renew job request for job {message.JobId} for the first time, stop dispatching job to worker.");
                    return;
                }

                if (jobRequestCancellationToken.IsCancellationRequested)
                {
                    Trace.Info($"Stop renew job request for job {message.JobId}.");
                    // stop renew lock
                    lockRenewalTokenSource.Cancel();
                    // renew job request should never blows up.
                    await renewJobRequest;

                    // complete job request with result Cancelled
                    await CompleteJobRequestAsync(_poolId, message, lockToken, TaskResult.Canceled);
                    return;
                }

                HostContext.WritePerfCounter($"JobRequestRenewed_{requestId.ToString()}");

                Task<int> workerProcessTask = null;
                object _outputLock = new object();
                List<string> workerOutput = new List<string>();
                using (var processChannel = HostContext.CreateService<IProcessChannel>())
                using (var processInvoker = HostContext.CreateService<IProcessInvoker>())
                {
                    // Start the process channel.
                    // It's OK if StartServer bubbles an execption after the worker process has already started.
                    // The worker will shutdown after 30 seconds if it hasn't received the job message.
                    processChannel.StartServer(
                        // Delegate to start the child process.
                        startProcess: (string pipeHandleOut, string pipeHandleIn) =>
                        {
                            // Validate args.
                            ArgUtil.NotNullOrEmpty(pipeHandleOut, nameof(pipeHandleOut));
                            ArgUtil.NotNullOrEmpty(pipeHandleIn, nameof(pipeHandleIn));

                            // Save STDOUT from worker, worker will use STDOUT report unhandle exception.
                            processInvoker.OutputDataReceived += delegate (object sender, ProcessDataReceivedEventArgs stdout)
                            {
                                if (!string.IsNullOrEmpty(stdout.Data))
                                {
                                    lock (_outputLock)
                                    {
                                        workerOutput.Add(stdout.Data);
                                    }
                                }
                            };

                            // Save STDERR from worker, worker will use STDERR on crash.
                            processInvoker.ErrorDataReceived += delegate (object sender, ProcessDataReceivedEventArgs stderr)
                            {
                                if (!string.IsNullOrEmpty(stderr.Data))
                                {
                                    lock (_outputLock)
                                    {
                                        workerOutput.Add(stderr.Data);
                                    }
                                }
                            };


                            // Start the child process.
                            HostContext.WritePerfCounter("StartingWorkerProcess");
                            var assemblyDirectory = HostContext.GetDirectory(WellKnownDirectory.Bin);
                            string workerFileName = Path.Combine(assemblyDirectory, _workerProcessName);
                            workerProcessTask = processInvoker.ExecuteAsync(
                                workingDirectory: assemblyDirectory,
                                fileName: workerFileName,
                                arguments: "spawnclient " + pipeHandleOut + " " + pipeHandleIn,
                                environment: null,
                                requireExitCodeZero: false,
                                outputEncoding: null,
                                killProcessOnCancel: true,
                                redirectStandardIn: null,
                                inheritConsoleHandler: false,
                                keepStandardInOpen: false,
                                highPriorityProcess: true,
                                cancellationToken: workerProcessCancelTokenSource.Token);
                        }
                    );

                    // Send the job request message.
                    // Kill the worker process if sending the job message times out. The worker
                    // process may have successfully received the job message.
                    try
                    {
                        var body = JsonUtility.ToString(message);
                        var numBytes = System.Text.ASCIIEncoding.Unicode.GetByteCount(body) / 1024;
                        string numBytesString = numBytes > 0 ? $"{numBytes} KB" : " < 1 KB";
                        Trace.Info($"Send job request message to worker for job {message.JobId} ({numBytesString}).");
                        HostContext.WritePerfCounter($"AgentSendingJobToWorker_{message.JobId}");
                        var stopWatch = Stopwatch.StartNew();
                        using (var csSendJobRequest = new CancellationTokenSource(_channelTimeout))
                        {
                            await processChannel.SendAsync(
                                messageType: MessageType.NewJobRequest,
                                body: body,
                                cancellationToken: csSendJobRequest.Token);
                        }
                        stopWatch.Stop();
                        Trace.Info($"Took {stopWatch.ElapsedMilliseconds} ms to send job message to worker");
                    }
                    catch (OperationCanceledException)
                    {
                        // message send been cancelled.
                        // timeout 30 sec. kill worker.
                        Trace.Info($"Job request message sending for job {message.JobId} been cancelled after waiting for {_channelTimeout.TotalSeconds} seconds, kill running worker.");
                        workerProcessCancelTokenSource.Cancel();
                        try
                        {
                            await workerProcessTask;
                        }
                        catch (OperationCanceledException)
                        {
                            Trace.Info("worker process has been killed.");
                        }

                        Trace.Info($"Stop renew job request for job {message.JobId}.");
                        // stop renew lock
                        lockRenewalTokenSource.Cancel();
                        // renew job request should never blows up.
                        await renewJobRequest;

                        // not finish the job request since the job haven't run on worker at all, we will not going to set a result to server.
                        return;
                    }

                    // we get first jobrequest renew succeed and start the worker process with the job message.
                    // send notification to machine provisioner.
                    var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));
                    var accessToken = systemConnection?.Authorization?.Parameters["AccessToken"];
                    VariableValue identifier = null;
                    VariableValue definitionId = null;

                    if (message.Plan.PlanType == "Build")
                    {
                        message.Variables.TryGetValue("build.buildId", out identifier);
                        message.Variables.TryGetValue("system.definitionId", out definitionId);
                    }
                    else if (message.Plan.PlanType == "Release")
                    {
                        message.Variables.TryGetValue("release.deploymentId", out identifier);
                        message.Variables.TryGetValue("release.definitionId", out definitionId);
                    }

                    await notification.JobStarted(message.JobId, accessToken, systemConnection.Url, message.Plan.PlanId, (identifier?.Value ?? "0"), (definitionId?.Value ?? "0"), message.Plan.PlanType);

                    HostContext.WritePerfCounter($"SentJobToWorker_{requestId.ToString()}");

                    try
                    {
                        TaskResult resultOnAbandonOrCancel = TaskResult.Succeeded;
                        // wait for renewlock, worker process or cancellation token been fired.
                        // keep listening iff we receive a metadata update
                        bool keepListening = true;
                        while (keepListening)
                        {
                            var metadataUpdateTask = newJobDispatch.MetadataSource.Task;
                            var completedTask = await Task.WhenAny(renewJobRequest, workerProcessTask, Task.Delay(-1, jobRequestCancellationToken), metadataUpdateTask);
                            if (completedTask == workerProcessTask)
                            {
                                keepListening = false;
                                // worker finished successfully, complete job request with result, attach unhandled exception reported by worker, stop renew lock, job has finished.
                                int returnCode = await workerProcessTask;
                                Trace.Info($"Worker finished for job {message.JobId}. Code: " + returnCode);

                                string detailInfo = null;
                                if (!TaskResultUtil.IsValidReturnCode(returnCode))
                                {
                                    detailInfo = string.Join(Environment.NewLine, workerOutput);
                                    Trace.Info($"Return code {returnCode} indicate worker encounter an unhandled exception or app crash, attach worker stdout/stderr to JobRequest result.");
                                    await LogWorkerProcessUnhandledException(message, detailInfo);
                                }

                                TaskResult result = TaskResultUtil.TranslateFromReturnCode(returnCode);
                                Trace.Info($"finish job request for job {message.JobId} with result: {result}");
                                term.WriteLine(StringUtil.Loc("JobCompleted", DateTime.UtcNow, message.JobDisplayName, result));

                                Trace.Info($"Stop renew job request for job {message.JobId}.");
                                // stop renew lock
                                lockRenewalTokenSource.Cancel();
                                // renew job request should never blows up.
                                await renewJobRequest;

                                // complete job request
                                await CompleteJobRequestAsync(_poolId, message, lockToken, result, detailInfo);

                                // print out unhandled exception happened in worker after we complete job request.
                                // when we run out of disk space, report back to server has higher priority.
                                if (!string.IsNullOrEmpty(detailInfo))
                                {
                                    Trace.Error("Unhandled exception happened in worker:");
                                    Trace.Error(detailInfo);
                                }

                                return;
                            }
                            else if (completedTask == renewJobRequest)
                            {
                                keepListening = false;
                                resultOnAbandonOrCancel = TaskResult.Abandoned;
                            }
                            else if (completedTask == metadataUpdateTask)
                            {
                                Trace.Info($"Send job metadata update message to worker for job {message.JobId}.");
                                using (var csSendCancel = new CancellationTokenSource(_channelTimeout))
                                {
                                    var body = JsonUtility.ToString(metadataUpdateTask.Result);

                                    await processChannel.SendAsync(
                                        messageType: MessageType.JobMetadataUpdate,
                                        body: body,
                                        cancellationToken: csSendCancel.Token);
                                }
                                newJobDispatch.ResetMetadataSource();
                            }
                            else
                            {
                                keepListening = false;
                                resultOnAbandonOrCancel = TaskResult.Canceled;
                            }
                        }

                        // renew job request completed or job request cancellation token been fired for RunAsync(jobrequestmessage)
                        // cancel worker gracefully first, then kill it after worker cancel timeout
                        try
                        {
                            Trace.Info($"Send job cancellation message to worker for job {message.JobId}.");
                            using (var csSendCancel = new CancellationTokenSource(_channelTimeout))
                            {
                                var messageType = MessageType.CancelRequest;
                                if (HostContext.AgentShutdownToken.IsCancellationRequested)
                                {
                                    switch (HostContext.AgentShutdownReason)
                                    {
                                        case ShutdownReason.UserCancelled:
                                            messageType = MessageType.AgentShutdown;
                                            break;
                                        case ShutdownReason.OperatingSystemShutdown:
                                            messageType = MessageType.OperatingSystemShutdown;
                                            break;
                                    }
                                }

                                await processChannel.SendAsync(
                                    messageType: messageType,
                                    body: string.Empty,
                                    cancellationToken: csSendCancel.Token);
                            }
                        }
                        catch (OperationCanceledException)
                        {
                            // message send been cancelled.
                            Trace.Info($"Job cancel message sending for job {message.JobId} been cancelled, kill running worker.");
                            workerProcessCancelTokenSource.Cancel();
                            try
                            {
                                await workerProcessTask;
                            }
                            catch (OperationCanceledException)
                            {
                                Trace.Info("worker process has been killed.");
                            }
                        }

                        // wait worker to exit
                        // if worker doesn't exit within timeout, then kill worker.
                        var exitTask = await Task.WhenAny(workerProcessTask, Task.Delay(-1, workerCancelTimeoutKillToken));

                        // worker haven't exit within cancellation timeout.
                        if (exitTask != workerProcessTask)
                        {
                            Trace.Info($"worker process for job {message.JobId} haven't exit within cancellation timout, kill running worker.");
                            workerProcessCancelTokenSource.Cancel();
                            try
                            {
                                await workerProcessTask;
                            }
                            catch (OperationCanceledException)
                            {
                                Trace.Info("worker process has been killed.");
                            }
                        }

                        Trace.Info($"finish job request for job {message.JobId} with result: {resultOnAbandonOrCancel}");
                        term.WriteLine(StringUtil.Loc("JobCompleted", DateTime.UtcNow, message.JobDisplayName, resultOnAbandonOrCancel));
                        // complete job request with cancel result, stop renew lock, job has finished.

                        Trace.Info($"Stop renew job request for job {message.JobId}.");
                        // stop renew lock
                        lockRenewalTokenSource.Cancel();
                        // renew job request should never blows up.
                        await renewJobRequest;

                        // complete job request
                        await CompleteJobRequestAsync(_poolId, message, lockToken, resultOnAbandonOrCancel);
                    }
                    catch (AggregateException e)
                    {
                        ExceptionsUtil.HandleAggregateException((AggregateException)e, Trace.Error);
                    }
                    finally
                    {
                        // This should be the last thing to run so we don't notify external parties until actually finished
                        await notification.JobCompleted(message.JobId);
                    }
                }
            }
        }