private async Task ExecutePipStep()

in Public/Src/Engine/Scheduler/Scheduler.cs [4028:4701]


        private async Task<PipExecutionStep> ExecutePipStep(RunnablePip runnablePip)
        {
            Contract.Requires(runnablePip != null);
            Contract.Requires(runnablePip.Step != PipExecutionStep.Done && runnablePip.Step != PipExecutionStep.None);

            ProcessRunnablePip processRunnable = runnablePip as ProcessRunnablePip;
            var pipId = runnablePip.PipId;
            var pipType = runnablePip.PipType;
            var loggingContext = runnablePip.LoggingContext;
            var operationContext = runnablePip.OperationContext;
            var environment = runnablePip.Environment;
            var fileContentManager = environment.State.FileContentManager;
            var step = runnablePip.Step;
            var worker = runnablePip.Worker;

            // If schedule is terminating (e.g., StopOnFirstFailure), cancel the pip
            // as long as (i) 'start' step has been executed, (ii) the pip is in running state, and (iii) the pip has not been cancelled before.
            if (ShouldCancelPip(runnablePip))
            {
                return runnablePip.Cancel();
            }

            switch (step)
            {
                case PipExecutionStep.Start:
                {
                    var state = TryStartPip(runnablePip);
                    if (state == PipState.Skipped)
                    {
                        return PipExecutionStep.Skip;
                    }

                    Contract.Assert(state == PipState.Running, I($"Cannot start pip in state: {state}"));

                    if (pipType.IsMetaPip())
                    {
                        return PipExecutionStep.ExecuteNonProcessPip;
                    }

                    using (operationContext.StartOperation(PipExecutorCounter.HashSourceFileDependenciesDuration))
                    {
                        // Hash source file dependencies
                        var maybeHashed = await fileContentManager.TryHashSourceDependenciesAsync(runnablePip.Pip, operationContext);
                        if (!maybeHashed.Succeeded)
                        {
                            Logger.Log.PipFailedDueToSourceDependenciesCannotBeHashed(
                                loggingContext,
                                runnablePip.Description);
                            return runnablePip.SetPipResult(PipResultStatus.Failed);
                        }
                    }

                    // For module affinity, we need to set the preferred worker id.
                    // This is intentionally put here after we hydrate the pip for the first time when accessing
                    // runnablePip.Pip above for hashing dependencies.
                    if (runnablePip.Pip.Provenance.ModuleId.IsValid &&
                        m_moduleWorkerMapping.TryGetValue(runnablePip.Pip.Provenance.ModuleId, out var tuple) &&
                        tuple.Workers.Count > 0)
                    {
                        runnablePip.PreferredWorkerId = (int)tuple.Workers[0].WorkerId;
                    }

                    switch (pipType)
                    {
                        case PipType.Process:
                            if (processRunnable.Process.IsStartOrShutdownKind)
                            {
                                // Service start and shutdown pips are noop in the scheduler.
                                // They will be run on demand by the service manager which is not tracked directly by the scheduler.
                                return runnablePip.SetPipResult(PipResult.CreateWithPointPerformanceInfo(PipResultStatus.Succeeded));
                            }

                            break;
                        case PipType.Ipc:

                            // IPC pips go to ChooseWorker before checking the incremental state
                            return PipExecutionStep.ChooseWorkerCpu;
                    }

                    return PipExecutionStep.CheckIncrementalSkip; // CopyFile, WriteFile, Process, SealDirectory pips

                }

                case PipExecutionStep.Cancel:
                {
                    // Make sure shared opaque outputs are flagged as such.
                    FlagSharedOpaqueOutputsOnCancellation(runnablePip);

                    Logger.Log.ScheduleCancelingPipSinceScheduleIsTerminating(
                        loggingContext,
                        runnablePip.Description);
                    return runnablePip.SetPipResult(PipResult.CreateWithPointPerformanceInfo(PipResultStatus.Canceled));
                }

                case PipExecutionStep.Skip:
                {
                    // We report skipped pips when all dependencies (failed or otherwise) complete.
                    // This has the side-effect that stack depth is bounded when a pip fails; ReportSkippedPip
                    // reports failure which is then handled in OnPipCompleted as part of the normal queue processing
                    // (rather than recursively abandoning dependents here).
                    LogEventWithPipProvenance(runnablePip, Logger.Log.SchedulePipFailedDueToFailedPrerequisite);
                    return runnablePip.SetPipResult(PipResult.Skipped);
                }

                case PipExecutionStep.MaterializeOutputs:
                {
                    m_materializeOutputsQueued = true;
                    PipResultStatus materializationResult = await worker.MaterializeOutputsAsync(runnablePip);

                    var nextStep = processRunnable?.ExecutionResult != null
                        ? processRunnable.SetPipResult(processRunnable.ExecutionResult.CloneSealedWithResult(materializationResult))
                        : runnablePip.SetPipResult(materializationResult);

                    if (!MaterializeOutputsInBackground)
                    {
                        return nextStep;
                    }

                    if (materializationResult.IndicatesFailure())
                    {
                        m_hasFailures = true;
                    }
                    else
                    {
                        IncrementalSchedulingState?.PendingUpdates.MarkNodeMaterialized(runnablePip.PipId.ToNodeId());
                        Logger.Log.PipIsMarkedMaterialized(loggingContext, runnablePip.Description);
                    }

                    return PipExecutionStep.Done;
                }

                case PipExecutionStep.CheckIncrementalSkip:
                {
                    // Enable incremental scheduling when distributed build role is none, and
                    // dirty build is not used (forceSkipDependencies is false).
                    if (IsPipCleanMaterialized(pipId))
                    {
                        var maybeHashed = await fileContentManager.TryHashOutputsAsync(runnablePip.Pip, operationContext);
                        if (!maybeHashed.Succeeded)
                        {
                            if (maybeHashed.Failure is CancellationFailure)
                            {
                                Contract.Assert(loggingContext.ErrorWasLogged);
                            }
                            else
                            {
                                Logger.Log.PipFailedDueToOutputsCannotBeHashed(
                                    loggingContext,
                                    runnablePip.Description);
                            }
                        }
                        else
                        {
                            PipExecutionCounters.IncrementCounter(PipExecutorCounter.IncrementalSkipPipDueToCleanMaterialized);

                            if (runnablePip.Pip.PipType == PipType.Process)
                            {
                                PipExecutionCounters.IncrementCounter(PipExecutorCounter.IncrementalSkipProcessDueToCleanMaterialized);
                            }

                            Logger.Log.PipIsIncrementallySkippedDueToCleanMaterialized(loggingContext, runnablePip.Description);
                        }

                        return runnablePip.SetPipResult(PipResult.Create(
                            maybeHashed.Succeeded ? PipResultStatus.UpToDate : PipResultStatus.Failed,
                            runnablePip.StartTime));
                    }

                    if (m_scheduleConfiguration.ForceSkipDependencies != ForceSkipDependenciesMode.Disabled && m_mustExecuteNodesForDirtyBuild != null)
                    {
                        if (!m_mustExecuteNodesForDirtyBuild.Contains(pipId.ToNodeId()))
                        {
                            // When dirty build is enabled, we skip the scheduled pips whose outputs are present and are in the transitive dependency chain
                            // The skipped ones during execution are not explicitly scheduled pips at all.
                            return runnablePip.SetPipResult(PipResult.Create(
                                PipResultStatus.UpToDate,
                                runnablePip.StartTime));
                        }

                        using (PipExecutionCounters.StartStopwatch(PipExecutorCounter.HashProcessDependenciesDuration))
                        {
                            // The dependencies may have been skipped, so hash the processes inputs
                            var maybeHashed = await fileContentManager.TryHashDependenciesAsync(runnablePip.Pip, operationContext);
                            if (!maybeHashed.Succeeded)
                            {
                                if (!(maybeHashed.Failure is CancellationFailure))
                                {
                                    Logger.Log.PipFailedDueToDependenciesCannotBeHashed(
                                        loggingContext,
                                        runnablePip.Description);
                                }

                                return runnablePip.SetPipResult(PipResultStatus.Failed);
                            }
                        }
                    }

                    if (pipType == PipType.Process)
                    {
                        return m_configuration.Schedule.DelayedCacheLookupEnabled() ? PipExecutionStep.DelayedCacheLookup : PipExecutionStep.ChooseWorkerCacheLookup;
                    }
                    else
                    {
                        return PipExecutionStep.ExecuteNonProcessPip;
                    }
                }

                case PipExecutionStep.DelayedCacheLookup:
                {
                    return PipExecutionStep.ChooseWorkerCacheLookup;
                }

                case PipExecutionStep.ChooseWorkerCacheLookup:
                {
                    Contract.Assert(pipType == PipType.Process);
                    Contract.Assert(worker == null);

                    worker = await m_chooseWorkerCacheLookup.ChooseWorkerAsync(runnablePip);
                    if (worker == null)
                    {
                        // If none of the workers is available, enqueue again.
                        // We always want to choose a worker for the highest priority item. That's why, we enqueue again
                        return PipExecutionStep.ChooseWorkerCacheLookup;
                    }

                    worker.Transition(runnablePip.PipId, WorkerPipState.ChosenForCacheLookup);
                    runnablePip.SetWorker(worker);

                    return PipExecutionStep.CacheLookup;
                }

                case PipExecutionStep.ChooseWorkerCpu:
                {
                    Contract.Assert(pipType == PipType.Process || pipType == PipType.Ipc);
                    Contract.Assert(worker == null);

                    worker = await ChooseWorkerCpuAsync(runnablePip);
                    if (worker == null)
                    {
                        // If none of the workers is available, enqueue again.
                        // We always want to choose a worker for the highest priority item. That's why, we enqueue again
                        return PipExecutionStep.ChooseWorkerCpu;
                    }

                    worker.Transition(runnablePip.PipId, WorkerPipState.ChosenForExecution);
                    runnablePip.SetWorker(worker);
                    if (InputsLazilyMaterialized)
                    {
                        // Materialize inputs if lazy materialization is enabled or this is a distributed build
                        return PipExecutionStep.MaterializeInputs;
                    }

                    if (pipType == PipType.Process)
                    {
                        return PipExecutionStep.ExecuteProcess;
                    }

                    Contract.Assert(pipType == PipType.Ipc);
                    return PipExecutionStep.ExecuteNonProcessPip;
                }

                case PipExecutionStep.MaterializeInputs:
                {
                    Contract.Assert(pipType == PipType.Process || pipType == PipType.Ipc);

                    PipResultStatus materializationResult = await worker.MaterializeInputsAsync(runnablePip);
                    if (materializationResult.IndicatesFailure())
                    {
                        return runnablePip.SetPipResult(materializationResult);
                    }

                    worker.OnInputMaterializationCompletion(runnablePip.Pip, this);

                    return pipType == PipType.Process ?
                        PipExecutionStep.ExecuteProcess :
                        PipExecutionStep.ExecuteNonProcessPip;
                }

                case PipExecutionStep.ExecuteNonProcessPip:
                {
                    var pipResult = await ExecuteNonProcessPipAsync(runnablePip);

                    if (runnablePip.PipType == PipType.Ipc && runnablePip.Worker?.IsRemote == true)
                    {
                        PipExecutionCounters.IncrementCounter(PipExecutorCounter.IpcPipsExecutedRemotely);
                    }

                    return runnablePip.SetPipResult(pipResult);
                }

                case PipExecutionStep.CacheLookup:
                {
                    Contract.Assert(processRunnable != null);
                    Contract.Assert(worker != null);

                    var process = processRunnable.Process;
                    var pipScope = State.GetScope(process);
                    var cacheableProcess = pipScope.GetCacheableProcess(process, environment);

                    var tupleResult = await worker.CacheLookupAsync(
                        processRunnable,
                        pipScope,
                        cacheableProcess);

                    var cacheResult = tupleResult.Item1;
                    if (cacheResult == null)
                    {
                        Contract.Assert(tupleResult.Item2 == PipResultStatus.Canceled || loggingContext.ErrorWasLogged, "Error should have been logged for dependency pip.");
                        return processRunnable.SetPipResult(tupleResult.Item2);
                    }

                    if (!m_configuration.Cache.DisableDeterminismProbeLogging)
                    {
                        HandleDeterminismProbe(loggingContext, environment, cacheResult, runnablePip.Description);
                    }

                    processRunnable.SetCacheableProcess(cacheableProcess);
                    processRunnable.SetCacheResult(cacheResult);

                    using (operationContext.StartOperation(PipExecutorCounter.ReportRemoteMetadataAndPathSetDuration))
                    {
                        // It only executes on orchestrator; but we still acquire the slot on the worker.
                        if (cacheResult.CanRunFromCache && worker.IsRemote)
                        {
                            var cacheHitData = cacheResult.GetCacheHitData();
                            m_pipTwoPhaseCache.ReportRemoteMetadataAndPathSet(
                                cacheHitData.Metadata,
                                cacheHitData.MetadataHash,
                                cacheHitData.PathSet,
                                cacheHitData.PathSetHash,
                                cacheResult.WeakFingerprint,
                                cacheHitData.StrongFingerprint,
                                isExecution: false,
                                process.PreservePathSetCasing);
                        }
                    }

                    if (cacheResult.CanRunFromCache)
                    {
                        // Always execute the process if the determinism probe is enabled.
                        // Pips that must be run due to non-determinism are NOT counted as cache misses.
                        if (!m_configuration.Cache.DeterminismProbe)
                        {
                            return PipExecutionStep.RunFromCache;
                        }
                    }
                    else if (m_configuration.Schedule.CacheOnly)
                    {
                        // CacheOnly mode only wants to perform cache lookups and skip execution for pips that are misses
                        environment.Counters.IncrementCounter(PipExecutorCounter.ProcessPipsSkippedExecutionDueToCacheOnly);
                        PipRuntimeInfo pipRuntimeInfo = GetPipRuntimeInfo(pipId);
                        pipRuntimeInfo.Transition(m_pipStateCounters, pipType, PipState.Skipped);
                        return PipExecutionStep.Skip;
                    }
                    else
                    {
                        environment.Counters.IncrementCounter(PipExecutorCounter.ProcessPipsExecutedDueToCacheMiss);
                    }

                    return PipExecutionStep.ChooseWorkerCpu;
                }

                case PipExecutionStep.RunFromCache:
                {
                    Contract.Assert(processRunnable != null);

                    Process process = (Process)processRunnable.Pip;
                    var pipScope = State.GetScope(process);
                    var executionResult = await PipExecutor.RunFromCacheWithWarningsAsync(operationContext, environment, pipScope, process, processRunnable.CacheResult, processRunnable.Description);

                    return processRunnable.SetPipResult(executionResult);
                }

                case PipExecutionStep.ExecuteProcess:
                {
                    MarkPipStartExecuting();

                    if (processRunnable.Weight > 1)
                    {
                        // Only log for pips with non-standard process weights
                        Logger.Log.ProcessPipProcessWeight(loggingContext, processRunnable.Description, processRunnable.Weight);
                    }

                    processRunnable.Executed = true;

                    var executionResult = await worker.ExecuteProcessAsync(processRunnable);

                    // Don't count service pips in process pip counters
                    if (!processRunnable.Process.IsStartOrShutdownKind && executionResult.PerformanceInformation != null)
                    {
                        var perfInfo = executionResult.PerformanceInformation;

                        try
                        {
                            m_groupedPipCounters.AddToCounters(processRunnable.Process,
                                new[]
                                {
                                        (PipCountersByGroup.IOReadBytes,  (long) perfInfo.IO.ReadCounters.TransferCount),
                                        (PipCountersByGroup.IOWriteBytes, (long) perfInfo.IO.WriteCounters.TransferCount)
                                },
                                new[] { (PipCountersByGroup.ExecuteProcessDuration, perfInfo.ProcessExecutionTime) }
                            );
                        }
                        catch (OverflowException ex)
                        {
                            Logger.Log.ExecutePipStepOverflowFailure(operationContext, ex.Message);

                            m_groupedPipCounters.AddToCounters(processRunnable.Process,
                                new[] { (PipCountersByGroup.IOReadBytes, 0L), (PipCountersByGroup.IOWriteBytes, 0L) },
                                new[] { (PipCountersByGroup.ExecuteProcessDuration, perfInfo.ProcessExecutionTime) }
                            );
                        }
                    }

                    // The pip was canceled due to retryable failure
                    if (executionResult.Result == PipResultStatus.Canceled && !IsTerminating)
                    {
                        Contract.Assert(executionResult.RetryInfo != null, $"Retry Information is required for all retry cases. IsTerminating: {m_scheduleTerminating}");
                        RetryReason retryReason = executionResult.RetryInfo.RetryReason;

                        if (worker.IsLocal)
                        {
                            // Because the scheduler will re-run this pip, we have to nuke all outputs created under shared opaque directories
                            var sharedOpaqueOutputs = FlagAndReturnScrubbableSharedOpaqueOutputs(environment, processRunnable);
                            ScrubSharedOpaqueOutputs(sharedOpaqueOutputs);
                        }

                        // If it is a single machine or distributed build orchestrator
                        if (!IsDistributedBuild || IsDistributedOrchestrator)
                        {
                            if (retryReason == RetryReason.ResourceExhaustion)
                            {
                                // Use the max of the observed memory and the worker's expected memory (multiplied with 1.25 to increase the expectations) for the pip
                                var expectedCounters = processRunnable.ExpectedMemoryCounters.Value;
                                var actualCounters = executionResult.PerformanceInformation?.MemoryCounters;
                                processRunnable.ExpectedMemoryCounters = ProcessMemoryCounters.CreateFromMb(
                                    peakWorkingSetMb: Math.Max((int)(expectedCounters.PeakWorkingSetMb * 1.25), actualCounters?.PeakWorkingSetMb ?? 0),
                                    averageWorkingSetMb: Math.Max((int)(expectedCounters.AverageWorkingSetMb * 1.25), actualCounters?.AverageWorkingSetMb ?? 0),
                                    peakCommitSizeMb: Math.Max((int)(expectedCounters.PeakCommitSizeMb * 1.25), actualCounters?.PeakCommitSizeMb ?? 0),
                                    averageCommitSizeMb: Math.Max((int)(expectedCounters.AverageCommitSizeMb * 1.25), actualCounters?.AverageCommitSizeMb ?? 0));

                                if (m_scheduleConfiguration.MaxRetriesDueToLowMemory.HasValue &&
                                    processRunnable.Performance.RetryCountDueToLowMemory == m_scheduleConfiguration.MaxRetriesDueToLowMemory)
                                {
                                    Logger.Log.ExcessivePipRetriesDueToLowMemory(operationContext, processRunnable.Description, processRunnable.Performance.RetryCountDueToLowMemory);
                                    return runnablePip.SetPipResult(PipResultStatus.Failed);
                                }
                                else
                                {
                                    Logger.Log.PipRetryDueToLowMemory(operationContext, processRunnable.Description, worker.DefaultWorkingSetMbPerProcess, expectedCounters.PeakWorkingSetMb, actualCounters?.PeakWorkingSetMb ?? 0);
                                }
                            }
                            else if (retryReason.IsPreProcessExecOrRemotingInfraFailure())
                            {
                                if (processRunnable.Performance.RetryCountDueToRetryableFailures == m_scheduleConfiguration.MaxRetriesDueToRetryableFailures)
                                {
                                    Logger.Log.ExcessivePipRetriesDueToRetryableFailures(operationContext, processRunnable.Description,
                                        processRunnable.Performance.RetryCountDueToRetryableFailures, executionResult.RetryInfo.RetryReason.ToString());
                                    return runnablePip.SetPipResult(PipResultStatus.Failed);
                                }
                                else
                                {
                                    Logger.Log.PipRetryDueToRetryableFailures(operationContext, processRunnable.Description, retryReason.ToString());
                                    if (retryReason == RetryReason.RemoteFallback)
                                    {
                                        // Force local execution.
                                        processRunnable.RunLocation = ProcessRunLocation.Local;
                                    }
                                }
                            }
                        }

                        return processRunnable.SetPipResult(executionResult.Result);
                    }

                    m_pipPropertyInfo.UpdatePipPropertyInfo(processRunnable, executionResult);
                    m_pipRetryInfo.UpdatePipRetryInfo(processRunnable, executionResult, PipExecutionCounters);

                    if (runnablePip.Worker?.IsRemote == true)
                    {
                        PipExecutionCounters.IncrementCounter(PipExecutorCounter.ProcessesExecutedRemotely);
                    }

                    if (m_configuration.Cache.DeterminismProbe && processRunnable.CacheResult.CanRunFromCache)
                    {
                        if (m_configuration.Cache.DisableDeterminismProbeLogging)
                        {
                            // Don't check determinism probe
                            return PipExecutionStep.RunFromCache;
                        }

                        // Compare strong fingerprints between execution and cache hit for determinism probe
                        return CheckMatchForDeterminismProbe(processRunnable);
                    }

                    return PipExecutionStep.PostProcess;
                }

                case PipExecutionStep.PostProcess:
                {
                    var executionResult = processRunnable.ExecutionResult;

                    if (executionResult.PerformanceInformation != null)
                    {
                        var perfInfo = executionResult.PerformanceInformation;
                        m_perPipPerformanceInfoStore.AddPip(new PerProcessPipPerformanceInformation(
                            ref processRunnable,
                            (int)perfInfo.ProcessExecutionTime.TotalMilliseconds,
                            perfInfo.MemoryCounters.PeakWorkingSetMb,
                            (int)Math.Ceiling(perfInfo.IO.ReadCounters.TransferCount / BytesInMb),
                            (int)Math.Ceiling(perfInfo.IO.WriteCounters.TransferCount / BytesInMb)));
                    }

                    // Make sure all shared outputs are flagged as such.
                    // We need to do this even if the pip failed, so any writes under shared opaques are flagged anyway.
                    // This allows the scrubber to remove those files as well in the next run.
                    var start = DateTime.UtcNow;
                    var sharedOpaqueOutputs = FlagAndReturnScrubbableSharedOpaqueOutputs(environment, processRunnable);
                    LogSubPhaseDuration(operationContext, runnablePip.Pip, SandboxedProcessFactory.SandboxedProcessCounters.SchedulerPhaseFlaggingSharedOpaqueOutputs, DateTime.UtcNow.Subtract(start), $"(count: {sharedOpaqueOutputs.Count})");

                    // Set the process as executed. NOTE: We do this here rather than during ExecuteProcess to handle
                    // case of processes executed remotely
                    var pipScope = State.GetScope(processRunnable.Process);

                    bool pipIsSafeToCache = true;

                    IReadOnlyDictionary<FileArtifact, (FileMaterializationInfo, ReportedViolation)> allowedSameContentViolations = null;

                    if (!IsDistributedWorker)
                    {
                        var expectedMemoryCounters = processRunnable.ExpectedMemoryCounters.Value;

                        int peakWorkingSetMb = executionResult.PerformanceInformation?.MemoryCounters.PeakWorkingSetMb ?? 0;
                        int averageWorkingSetMb = executionResult.PerformanceInformation?.MemoryCounters.AverageWorkingSetMb ?? 0;
                        int peakCommitSizeMb = executionResult.PerformanceInformation?.MemoryCounters.PeakCommitSizeMb ?? 0;
                        int averageCommitSizeMb = executionResult.PerformanceInformation?.MemoryCounters.AverageCommitSizeMb ?? 0;

                        try
                        {
                            Logger.Log.ProcessPipExecutionInfo(
                                operationContext,
                                runnablePip.Description,
                                executionResult.PerformanceInformation?.NumberOfProcesses ?? 0,
                                (processRunnable.HistoricPerfData?.DurationInMs ?? 0) / 1000.0,
                                executionResult.PerformanceInformation?.ProcessExecutionTime.TotalSeconds ?? 0,
                                executionResult.PerformanceInformation?.ProcessorsInPercents ?? 0,
                                processRunnable.Weight,
                                worker.DefaultWorkingSetMbPerProcess,
                                expectedMemoryCounters.PeakWorkingSetMb,
                                peakWorkingSetMb,
                                expectedMemoryCounters.AverageWorkingSetMb,
                                averageWorkingSetMb,
                                expectedMemoryCounters.PeakCommitSizeMb,
                                peakCommitSizeMb,
                                expectedMemoryCounters.AverageCommitSizeMb,
                                averageCommitSizeMb,
                                (int)(processRunnable.HistoricPerfData?.DiskIOInMB ?? 0),
                                (int)ByteSizeFormatter.ToMegabytes(executionResult.PerformanceInformation?.IO.GetAggregateIO().TransferCount ?? 0));

                            m_totalPeakWorkingSetMb += (ulong)peakWorkingSetMb;
                            m_totalAverageWorkingSetMb += (ulong)averageWorkingSetMb;

                            m_totalPeakCommitSizeMb += (ulong)peakCommitSizeMb;
                            m_totalAverageCommitSizeMb += (ulong)averageCommitSizeMb;
                        }
                        catch (OverflowException ex)
                        {
                            Logger.Log.ExecutePipStepOverflowFailure(operationContext, ex.Message);
                        }

                        // File violation analysis needs to happen on the orchestrator as it relies on
                        // graph-wide data such as detecting duplicate
                        start = DateTime.UtcNow;
                        executionResult = PipExecutor.AnalyzeFileAccessViolations(
                            operationContext,
                            environment,
                            pipScope,
                            executionResult,
                            processRunnable.Process,
                            out pipIsSafeToCache,
                            out allowedSameContentViolations);
                        LogSubPhaseDuration(operationContext, runnablePip.Pip, SandboxedProcessCounters.SchedulerPhaseAnalyzingFileAccessViolations, DateTime.UtcNow.Subtract(start));

                        processRunnable.SetExecutionResult(executionResult);

                        if (executionResult.Result.IndicatesFailure())
                        {
                            // Dependency analysis failure. Bail out before performing post processing. This prevents
                            // the output from being cached as well as downstream pips from being run.
                            return processRunnable.SetPipResult(executionResult);
                        }
                    }

                    if (pipIsSafeToCache)
                    {
                        // The worker should only cache the pip if the violation analyzer allows it to.
                        executionResult = await worker.PostProcessAsync(processRunnable);
                    }
                    else
                    {
                        Logger.Log.ScheduleProcessNotStoredToCacheDueToFileMonitoringViolations(loggingContext, processRunnable.Description);
                    }

                    // If the result converged we should delete shared opaque outputs where the execution happened. On convergence, the result
                    // will be consumed from the already cached pip and the just produced outputs should be absent.
                    if (executionResult.Converged && worker.IsLocal)
                    {
                        ScrubSharedOpaqueOutputs(sharedOpaqueOutputs);
                    }

                    if (!IsDistributedWorker)
                    {
                        m_chooseWorkerCpu.ReportProcessExecutionOutputs(processRunnable, executionResult);

                        // If the cache converged outputs, we need to check for double writes again, since the configured policy may care about
                        // the content of the (final) outputs
                        if (executionResult.Converged)
                        {
                            start = DateTime.UtcNow;
                            executionResult = PipExecutor.AnalyzeDoubleWritesOnCacheConvergence(
                               operationContext,
                               environment,
                               pipScope,
                               executionResult,
                               processRunnable.Process,
                               allowedSameContentViolations);
                            LogSubPhaseDuration(operationContext, runnablePip.Pip, SandboxedProcessCounters.SchedulerPhaseAnalyzingDoubleWrites, DateTime.UtcNow.Subtract(start));

                            processRunnable.SetExecutionResult(executionResult);

                            if (executionResult.Result.IndicatesFailure())
                            {
                                // Dependency analysis failure. Even though the pip is already cached, we got a cache converged event, so
                                // it is safe for other downstream pips to consume the cached result. However, some double writes were found based
                                // on the configured policy, so we fail the build
                                return processRunnable.SetPipResult(executionResult);
                            }
                        }
                    }

                    if (runnablePip.Worker?.IsRemote == true)
                    {
                        m_pipTwoPhaseCache.ReportRemoteMetadataAndPathSet(
                            executionResult.PipCacheDescriptorV2Metadata,
                            executionResult.TwoPhaseCachingInfo?.CacheEntry.MetadataHash,
                            executionResult.PathSet,
                            executionResult.TwoPhaseCachingInfo?.PathSetHash,
                            executionResult.TwoPhaseCachingInfo?.WeakFingerprint,
                            executionResult.TwoPhaseCachingInfo?.StrongFingerprint,
                            isExecution: !executionResult.Converged,
                            preservePathCasing: processRunnable.Process.PreservePathSetCasing);
                    }

                    // Output content is reported here to ensure that it happens both on worker executing PostProcess and
                    // orchestrator which called worker to execute post process.
                    start = DateTime.UtcNow;
                    PipExecutor.ReportExecutionResultOutputContent(
                        operationContext,
                        environment,
                        processRunnable.Pip.SemiStableHash,
                        executionResult,
                        processRunnable.Process.RewritePolicy.ImpliesDoubleWriteIsWarning());
                    LogSubPhaseDuration(operationContext, runnablePip.Pip, SandboxedProcessCounters.SchedulerPhaseReportingOutputContent, DateTime.UtcNow.Subtract(start), $"(num outputs: {executionResult.OutputContent.Length})");
                    return processRunnable.SetPipResult(executionResult);
                }

                case PipExecutionStep.HandleResult:
                    await OnPipCompleted(runnablePip);
                    return PipExecutionStep.Done;

                default:
                    throw Contract.AssertFailure(I($"Do not know how to run this pip step: '{step}'"));
            }
        }