in Public/Src/Engine/Scheduler/Scheduler.cs [4028:4701]
private async Task<PipExecutionStep> ExecutePipStep(RunnablePip runnablePip)
{
Contract.Requires(runnablePip != null);
Contract.Requires(runnablePip.Step != PipExecutionStep.Done && runnablePip.Step != PipExecutionStep.None);
ProcessRunnablePip processRunnable = runnablePip as ProcessRunnablePip;
var pipId = runnablePip.PipId;
var pipType = runnablePip.PipType;
var loggingContext = runnablePip.LoggingContext;
var operationContext = runnablePip.OperationContext;
var environment = runnablePip.Environment;
var fileContentManager = environment.State.FileContentManager;
var step = runnablePip.Step;
var worker = runnablePip.Worker;
// If schedule is terminating (e.g., StopOnFirstFailure), cancel the pip
// as long as (i) 'start' step has been executed, (ii) the pip is in running state, and (iii) the pip has not been cancelled before.
if (ShouldCancelPip(runnablePip))
{
return runnablePip.Cancel();
}
switch (step)
{
case PipExecutionStep.Start:
{
var state = TryStartPip(runnablePip);
if (state == PipState.Skipped)
{
return PipExecutionStep.Skip;
}
Contract.Assert(state == PipState.Running, I($"Cannot start pip in state: {state}"));
if (pipType.IsMetaPip())
{
return PipExecutionStep.ExecuteNonProcessPip;
}
using (operationContext.StartOperation(PipExecutorCounter.HashSourceFileDependenciesDuration))
{
// Hash source file dependencies
var maybeHashed = await fileContentManager.TryHashSourceDependenciesAsync(runnablePip.Pip, operationContext);
if (!maybeHashed.Succeeded)
{
Logger.Log.PipFailedDueToSourceDependenciesCannotBeHashed(
loggingContext,
runnablePip.Description);
return runnablePip.SetPipResult(PipResultStatus.Failed);
}
}
// For module affinity, we need to set the preferred worker id.
// This is intentionally put here after we hydrate the pip for the first time when accessing
// runnablePip.Pip above for hashing dependencies.
if (runnablePip.Pip.Provenance.ModuleId.IsValid &&
m_moduleWorkerMapping.TryGetValue(runnablePip.Pip.Provenance.ModuleId, out var tuple) &&
tuple.Workers.Count > 0)
{
runnablePip.PreferredWorkerId = (int)tuple.Workers[0].WorkerId;
}
switch (pipType)
{
case PipType.Process:
if (processRunnable.Process.IsStartOrShutdownKind)
{
// Service start and shutdown pips are noop in the scheduler.
// They will be run on demand by the service manager which is not tracked directly by the scheduler.
return runnablePip.SetPipResult(PipResult.CreateWithPointPerformanceInfo(PipResultStatus.Succeeded));
}
break;
case PipType.Ipc:
// IPC pips go to ChooseWorker before checking the incremental state
return PipExecutionStep.ChooseWorkerCpu;
}
return PipExecutionStep.CheckIncrementalSkip; // CopyFile, WriteFile, Process, SealDirectory pips
}
case PipExecutionStep.Cancel:
{
// Make sure shared opaque outputs are flagged as such.
FlagSharedOpaqueOutputsOnCancellation(runnablePip);
Logger.Log.ScheduleCancelingPipSinceScheduleIsTerminating(
loggingContext,
runnablePip.Description);
return runnablePip.SetPipResult(PipResult.CreateWithPointPerformanceInfo(PipResultStatus.Canceled));
}
case PipExecutionStep.Skip:
{
// We report skipped pips when all dependencies (failed or otherwise) complete.
// This has the side-effect that stack depth is bounded when a pip fails; ReportSkippedPip
// reports failure which is then handled in OnPipCompleted as part of the normal queue processing
// (rather than recursively abandoning dependents here).
LogEventWithPipProvenance(runnablePip, Logger.Log.SchedulePipFailedDueToFailedPrerequisite);
return runnablePip.SetPipResult(PipResult.Skipped);
}
case PipExecutionStep.MaterializeOutputs:
{
m_materializeOutputsQueued = true;
PipResultStatus materializationResult = await worker.MaterializeOutputsAsync(runnablePip);
var nextStep = processRunnable?.ExecutionResult != null
? processRunnable.SetPipResult(processRunnable.ExecutionResult.CloneSealedWithResult(materializationResult))
: runnablePip.SetPipResult(materializationResult);
if (!MaterializeOutputsInBackground)
{
return nextStep;
}
if (materializationResult.IndicatesFailure())
{
m_hasFailures = true;
}
else
{
IncrementalSchedulingState?.PendingUpdates.MarkNodeMaterialized(runnablePip.PipId.ToNodeId());
Logger.Log.PipIsMarkedMaterialized(loggingContext, runnablePip.Description);
}
return PipExecutionStep.Done;
}
case PipExecutionStep.CheckIncrementalSkip:
{
// Enable incremental scheduling when distributed build role is none, and
// dirty build is not used (forceSkipDependencies is false).
if (IsPipCleanMaterialized(pipId))
{
var maybeHashed = await fileContentManager.TryHashOutputsAsync(runnablePip.Pip, operationContext);
if (!maybeHashed.Succeeded)
{
if (maybeHashed.Failure is CancellationFailure)
{
Contract.Assert(loggingContext.ErrorWasLogged);
}
else
{
Logger.Log.PipFailedDueToOutputsCannotBeHashed(
loggingContext,
runnablePip.Description);
}
}
else
{
PipExecutionCounters.IncrementCounter(PipExecutorCounter.IncrementalSkipPipDueToCleanMaterialized);
if (runnablePip.Pip.PipType == PipType.Process)
{
PipExecutionCounters.IncrementCounter(PipExecutorCounter.IncrementalSkipProcessDueToCleanMaterialized);
}
Logger.Log.PipIsIncrementallySkippedDueToCleanMaterialized(loggingContext, runnablePip.Description);
}
return runnablePip.SetPipResult(PipResult.Create(
maybeHashed.Succeeded ? PipResultStatus.UpToDate : PipResultStatus.Failed,
runnablePip.StartTime));
}
if (m_scheduleConfiguration.ForceSkipDependencies != ForceSkipDependenciesMode.Disabled && m_mustExecuteNodesForDirtyBuild != null)
{
if (!m_mustExecuteNodesForDirtyBuild.Contains(pipId.ToNodeId()))
{
// When dirty build is enabled, we skip the scheduled pips whose outputs are present and are in the transitive dependency chain
// The skipped ones during execution are not explicitly scheduled pips at all.
return runnablePip.SetPipResult(PipResult.Create(
PipResultStatus.UpToDate,
runnablePip.StartTime));
}
using (PipExecutionCounters.StartStopwatch(PipExecutorCounter.HashProcessDependenciesDuration))
{
// The dependencies may have been skipped, so hash the processes inputs
var maybeHashed = await fileContentManager.TryHashDependenciesAsync(runnablePip.Pip, operationContext);
if (!maybeHashed.Succeeded)
{
if (!(maybeHashed.Failure is CancellationFailure))
{
Logger.Log.PipFailedDueToDependenciesCannotBeHashed(
loggingContext,
runnablePip.Description);
}
return runnablePip.SetPipResult(PipResultStatus.Failed);
}
}
}
if (pipType == PipType.Process)
{
return m_configuration.Schedule.DelayedCacheLookupEnabled() ? PipExecutionStep.DelayedCacheLookup : PipExecutionStep.ChooseWorkerCacheLookup;
}
else
{
return PipExecutionStep.ExecuteNonProcessPip;
}
}
case PipExecutionStep.DelayedCacheLookup:
{
return PipExecutionStep.ChooseWorkerCacheLookup;
}
case PipExecutionStep.ChooseWorkerCacheLookup:
{
Contract.Assert(pipType == PipType.Process);
Contract.Assert(worker == null);
worker = await m_chooseWorkerCacheLookup.ChooseWorkerAsync(runnablePip);
if (worker == null)
{
// If none of the workers is available, enqueue again.
// We always want to choose a worker for the highest priority item. That's why, we enqueue again
return PipExecutionStep.ChooseWorkerCacheLookup;
}
worker.Transition(runnablePip.PipId, WorkerPipState.ChosenForCacheLookup);
runnablePip.SetWorker(worker);
return PipExecutionStep.CacheLookup;
}
case PipExecutionStep.ChooseWorkerCpu:
{
Contract.Assert(pipType == PipType.Process || pipType == PipType.Ipc);
Contract.Assert(worker == null);
worker = await ChooseWorkerCpuAsync(runnablePip);
if (worker == null)
{
// If none of the workers is available, enqueue again.
// We always want to choose a worker for the highest priority item. That's why, we enqueue again
return PipExecutionStep.ChooseWorkerCpu;
}
worker.Transition(runnablePip.PipId, WorkerPipState.ChosenForExecution);
runnablePip.SetWorker(worker);
if (InputsLazilyMaterialized)
{
// Materialize inputs if lazy materialization is enabled or this is a distributed build
return PipExecutionStep.MaterializeInputs;
}
if (pipType == PipType.Process)
{
return PipExecutionStep.ExecuteProcess;
}
Contract.Assert(pipType == PipType.Ipc);
return PipExecutionStep.ExecuteNonProcessPip;
}
case PipExecutionStep.MaterializeInputs:
{
Contract.Assert(pipType == PipType.Process || pipType == PipType.Ipc);
PipResultStatus materializationResult = await worker.MaterializeInputsAsync(runnablePip);
if (materializationResult.IndicatesFailure())
{
return runnablePip.SetPipResult(materializationResult);
}
worker.OnInputMaterializationCompletion(runnablePip.Pip, this);
return pipType == PipType.Process ?
PipExecutionStep.ExecuteProcess :
PipExecutionStep.ExecuteNonProcessPip;
}
case PipExecutionStep.ExecuteNonProcessPip:
{
var pipResult = await ExecuteNonProcessPipAsync(runnablePip);
if (runnablePip.PipType == PipType.Ipc && runnablePip.Worker?.IsRemote == true)
{
PipExecutionCounters.IncrementCounter(PipExecutorCounter.IpcPipsExecutedRemotely);
}
return runnablePip.SetPipResult(pipResult);
}
case PipExecutionStep.CacheLookup:
{
Contract.Assert(processRunnable != null);
Contract.Assert(worker != null);
var process = processRunnable.Process;
var pipScope = State.GetScope(process);
var cacheableProcess = pipScope.GetCacheableProcess(process, environment);
var tupleResult = await worker.CacheLookupAsync(
processRunnable,
pipScope,
cacheableProcess);
var cacheResult = tupleResult.Item1;
if (cacheResult == null)
{
Contract.Assert(tupleResult.Item2 == PipResultStatus.Canceled || loggingContext.ErrorWasLogged, "Error should have been logged for dependency pip.");
return processRunnable.SetPipResult(tupleResult.Item2);
}
if (!m_configuration.Cache.DisableDeterminismProbeLogging)
{
HandleDeterminismProbe(loggingContext, environment, cacheResult, runnablePip.Description);
}
processRunnable.SetCacheableProcess(cacheableProcess);
processRunnable.SetCacheResult(cacheResult);
using (operationContext.StartOperation(PipExecutorCounter.ReportRemoteMetadataAndPathSetDuration))
{
// It only executes on orchestrator; but we still acquire the slot on the worker.
if (cacheResult.CanRunFromCache && worker.IsRemote)
{
var cacheHitData = cacheResult.GetCacheHitData();
m_pipTwoPhaseCache.ReportRemoteMetadataAndPathSet(
cacheHitData.Metadata,
cacheHitData.MetadataHash,
cacheHitData.PathSet,
cacheHitData.PathSetHash,
cacheResult.WeakFingerprint,
cacheHitData.StrongFingerprint,
isExecution: false,
process.PreservePathSetCasing);
}
}
if (cacheResult.CanRunFromCache)
{
// Always execute the process if the determinism probe is enabled.
// Pips that must be run due to non-determinism are NOT counted as cache misses.
if (!m_configuration.Cache.DeterminismProbe)
{
return PipExecutionStep.RunFromCache;
}
}
else if (m_configuration.Schedule.CacheOnly)
{
// CacheOnly mode only wants to perform cache lookups and skip execution for pips that are misses
environment.Counters.IncrementCounter(PipExecutorCounter.ProcessPipsSkippedExecutionDueToCacheOnly);
PipRuntimeInfo pipRuntimeInfo = GetPipRuntimeInfo(pipId);
pipRuntimeInfo.Transition(m_pipStateCounters, pipType, PipState.Skipped);
return PipExecutionStep.Skip;
}
else
{
environment.Counters.IncrementCounter(PipExecutorCounter.ProcessPipsExecutedDueToCacheMiss);
}
return PipExecutionStep.ChooseWorkerCpu;
}
case PipExecutionStep.RunFromCache:
{
Contract.Assert(processRunnable != null);
Process process = (Process)processRunnable.Pip;
var pipScope = State.GetScope(process);
var executionResult = await PipExecutor.RunFromCacheWithWarningsAsync(operationContext, environment, pipScope, process, processRunnable.CacheResult, processRunnable.Description);
return processRunnable.SetPipResult(executionResult);
}
case PipExecutionStep.ExecuteProcess:
{
MarkPipStartExecuting();
if (processRunnable.Weight > 1)
{
// Only log for pips with non-standard process weights
Logger.Log.ProcessPipProcessWeight(loggingContext, processRunnable.Description, processRunnable.Weight);
}
processRunnable.Executed = true;
var executionResult = await worker.ExecuteProcessAsync(processRunnable);
// Don't count service pips in process pip counters
if (!processRunnable.Process.IsStartOrShutdownKind && executionResult.PerformanceInformation != null)
{
var perfInfo = executionResult.PerformanceInformation;
try
{
m_groupedPipCounters.AddToCounters(processRunnable.Process,
new[]
{
(PipCountersByGroup.IOReadBytes, (long) perfInfo.IO.ReadCounters.TransferCount),
(PipCountersByGroup.IOWriteBytes, (long) perfInfo.IO.WriteCounters.TransferCount)
},
new[] { (PipCountersByGroup.ExecuteProcessDuration, perfInfo.ProcessExecutionTime) }
);
}
catch (OverflowException ex)
{
Logger.Log.ExecutePipStepOverflowFailure(operationContext, ex.Message);
m_groupedPipCounters.AddToCounters(processRunnable.Process,
new[] { (PipCountersByGroup.IOReadBytes, 0L), (PipCountersByGroup.IOWriteBytes, 0L) },
new[] { (PipCountersByGroup.ExecuteProcessDuration, perfInfo.ProcessExecutionTime) }
);
}
}
// The pip was canceled due to retryable failure
if (executionResult.Result == PipResultStatus.Canceled && !IsTerminating)
{
Contract.Assert(executionResult.RetryInfo != null, $"Retry Information is required for all retry cases. IsTerminating: {m_scheduleTerminating}");
RetryReason retryReason = executionResult.RetryInfo.RetryReason;
if (worker.IsLocal)
{
// Because the scheduler will re-run this pip, we have to nuke all outputs created under shared opaque directories
var sharedOpaqueOutputs = FlagAndReturnScrubbableSharedOpaqueOutputs(environment, processRunnable);
ScrubSharedOpaqueOutputs(sharedOpaqueOutputs);
}
// If it is a single machine or distributed build orchestrator
if (!IsDistributedBuild || IsDistributedOrchestrator)
{
if (retryReason == RetryReason.ResourceExhaustion)
{
// Use the max of the observed memory and the worker's expected memory (multiplied with 1.25 to increase the expectations) for the pip
var expectedCounters = processRunnable.ExpectedMemoryCounters.Value;
var actualCounters = executionResult.PerformanceInformation?.MemoryCounters;
processRunnable.ExpectedMemoryCounters = ProcessMemoryCounters.CreateFromMb(
peakWorkingSetMb: Math.Max((int)(expectedCounters.PeakWorkingSetMb * 1.25), actualCounters?.PeakWorkingSetMb ?? 0),
averageWorkingSetMb: Math.Max((int)(expectedCounters.AverageWorkingSetMb * 1.25), actualCounters?.AverageWorkingSetMb ?? 0),
peakCommitSizeMb: Math.Max((int)(expectedCounters.PeakCommitSizeMb * 1.25), actualCounters?.PeakCommitSizeMb ?? 0),
averageCommitSizeMb: Math.Max((int)(expectedCounters.AverageCommitSizeMb * 1.25), actualCounters?.AverageCommitSizeMb ?? 0));
if (m_scheduleConfiguration.MaxRetriesDueToLowMemory.HasValue &&
processRunnable.Performance.RetryCountDueToLowMemory == m_scheduleConfiguration.MaxRetriesDueToLowMemory)
{
Logger.Log.ExcessivePipRetriesDueToLowMemory(operationContext, processRunnable.Description, processRunnable.Performance.RetryCountDueToLowMemory);
return runnablePip.SetPipResult(PipResultStatus.Failed);
}
else
{
Logger.Log.PipRetryDueToLowMemory(operationContext, processRunnable.Description, worker.DefaultWorkingSetMbPerProcess, expectedCounters.PeakWorkingSetMb, actualCounters?.PeakWorkingSetMb ?? 0);
}
}
else if (retryReason.IsPreProcessExecOrRemotingInfraFailure())
{
if (processRunnable.Performance.RetryCountDueToRetryableFailures == m_scheduleConfiguration.MaxRetriesDueToRetryableFailures)
{
Logger.Log.ExcessivePipRetriesDueToRetryableFailures(operationContext, processRunnable.Description,
processRunnable.Performance.RetryCountDueToRetryableFailures, executionResult.RetryInfo.RetryReason.ToString());
return runnablePip.SetPipResult(PipResultStatus.Failed);
}
else
{
Logger.Log.PipRetryDueToRetryableFailures(operationContext, processRunnable.Description, retryReason.ToString());
if (retryReason == RetryReason.RemoteFallback)
{
// Force local execution.
processRunnable.RunLocation = ProcessRunLocation.Local;
}
}
}
}
return processRunnable.SetPipResult(executionResult.Result);
}
m_pipPropertyInfo.UpdatePipPropertyInfo(processRunnable, executionResult);
m_pipRetryInfo.UpdatePipRetryInfo(processRunnable, executionResult, PipExecutionCounters);
if (runnablePip.Worker?.IsRemote == true)
{
PipExecutionCounters.IncrementCounter(PipExecutorCounter.ProcessesExecutedRemotely);
}
if (m_configuration.Cache.DeterminismProbe && processRunnable.CacheResult.CanRunFromCache)
{
if (m_configuration.Cache.DisableDeterminismProbeLogging)
{
// Don't check determinism probe
return PipExecutionStep.RunFromCache;
}
// Compare strong fingerprints between execution and cache hit for determinism probe
return CheckMatchForDeterminismProbe(processRunnable);
}
return PipExecutionStep.PostProcess;
}
case PipExecutionStep.PostProcess:
{
var executionResult = processRunnable.ExecutionResult;
if (executionResult.PerformanceInformation != null)
{
var perfInfo = executionResult.PerformanceInformation;
m_perPipPerformanceInfoStore.AddPip(new PerProcessPipPerformanceInformation(
ref processRunnable,
(int)perfInfo.ProcessExecutionTime.TotalMilliseconds,
perfInfo.MemoryCounters.PeakWorkingSetMb,
(int)Math.Ceiling(perfInfo.IO.ReadCounters.TransferCount / BytesInMb),
(int)Math.Ceiling(perfInfo.IO.WriteCounters.TransferCount / BytesInMb)));
}
// Make sure all shared outputs are flagged as such.
// We need to do this even if the pip failed, so any writes under shared opaques are flagged anyway.
// This allows the scrubber to remove those files as well in the next run.
var start = DateTime.UtcNow;
var sharedOpaqueOutputs = FlagAndReturnScrubbableSharedOpaqueOutputs(environment, processRunnable);
LogSubPhaseDuration(operationContext, runnablePip.Pip, SandboxedProcessFactory.SandboxedProcessCounters.SchedulerPhaseFlaggingSharedOpaqueOutputs, DateTime.UtcNow.Subtract(start), $"(count: {sharedOpaqueOutputs.Count})");
// Set the process as executed. NOTE: We do this here rather than during ExecuteProcess to handle
// case of processes executed remotely
var pipScope = State.GetScope(processRunnable.Process);
bool pipIsSafeToCache = true;
IReadOnlyDictionary<FileArtifact, (FileMaterializationInfo, ReportedViolation)> allowedSameContentViolations = null;
if (!IsDistributedWorker)
{
var expectedMemoryCounters = processRunnable.ExpectedMemoryCounters.Value;
int peakWorkingSetMb = executionResult.PerformanceInformation?.MemoryCounters.PeakWorkingSetMb ?? 0;
int averageWorkingSetMb = executionResult.PerformanceInformation?.MemoryCounters.AverageWorkingSetMb ?? 0;
int peakCommitSizeMb = executionResult.PerformanceInformation?.MemoryCounters.PeakCommitSizeMb ?? 0;
int averageCommitSizeMb = executionResult.PerformanceInformation?.MemoryCounters.AverageCommitSizeMb ?? 0;
try
{
Logger.Log.ProcessPipExecutionInfo(
operationContext,
runnablePip.Description,
executionResult.PerformanceInformation?.NumberOfProcesses ?? 0,
(processRunnable.HistoricPerfData?.DurationInMs ?? 0) / 1000.0,
executionResult.PerformanceInformation?.ProcessExecutionTime.TotalSeconds ?? 0,
executionResult.PerformanceInformation?.ProcessorsInPercents ?? 0,
processRunnable.Weight,
worker.DefaultWorkingSetMbPerProcess,
expectedMemoryCounters.PeakWorkingSetMb,
peakWorkingSetMb,
expectedMemoryCounters.AverageWorkingSetMb,
averageWorkingSetMb,
expectedMemoryCounters.PeakCommitSizeMb,
peakCommitSizeMb,
expectedMemoryCounters.AverageCommitSizeMb,
averageCommitSizeMb,
(int)(processRunnable.HistoricPerfData?.DiskIOInMB ?? 0),
(int)ByteSizeFormatter.ToMegabytes(executionResult.PerformanceInformation?.IO.GetAggregateIO().TransferCount ?? 0));
m_totalPeakWorkingSetMb += (ulong)peakWorkingSetMb;
m_totalAverageWorkingSetMb += (ulong)averageWorkingSetMb;
m_totalPeakCommitSizeMb += (ulong)peakCommitSizeMb;
m_totalAverageCommitSizeMb += (ulong)averageCommitSizeMb;
}
catch (OverflowException ex)
{
Logger.Log.ExecutePipStepOverflowFailure(operationContext, ex.Message);
}
// File violation analysis needs to happen on the orchestrator as it relies on
// graph-wide data such as detecting duplicate
start = DateTime.UtcNow;
executionResult = PipExecutor.AnalyzeFileAccessViolations(
operationContext,
environment,
pipScope,
executionResult,
processRunnable.Process,
out pipIsSafeToCache,
out allowedSameContentViolations);
LogSubPhaseDuration(operationContext, runnablePip.Pip, SandboxedProcessCounters.SchedulerPhaseAnalyzingFileAccessViolations, DateTime.UtcNow.Subtract(start));
processRunnable.SetExecutionResult(executionResult);
if (executionResult.Result.IndicatesFailure())
{
// Dependency analysis failure. Bail out before performing post processing. This prevents
// the output from being cached as well as downstream pips from being run.
return processRunnable.SetPipResult(executionResult);
}
}
if (pipIsSafeToCache)
{
// The worker should only cache the pip if the violation analyzer allows it to.
executionResult = await worker.PostProcessAsync(processRunnable);
}
else
{
Logger.Log.ScheduleProcessNotStoredToCacheDueToFileMonitoringViolations(loggingContext, processRunnable.Description);
}
// If the result converged we should delete shared opaque outputs where the execution happened. On convergence, the result
// will be consumed from the already cached pip and the just produced outputs should be absent.
if (executionResult.Converged && worker.IsLocal)
{
ScrubSharedOpaqueOutputs(sharedOpaqueOutputs);
}
if (!IsDistributedWorker)
{
m_chooseWorkerCpu.ReportProcessExecutionOutputs(processRunnable, executionResult);
// If the cache converged outputs, we need to check for double writes again, since the configured policy may care about
// the content of the (final) outputs
if (executionResult.Converged)
{
start = DateTime.UtcNow;
executionResult = PipExecutor.AnalyzeDoubleWritesOnCacheConvergence(
operationContext,
environment,
pipScope,
executionResult,
processRunnable.Process,
allowedSameContentViolations);
LogSubPhaseDuration(operationContext, runnablePip.Pip, SandboxedProcessCounters.SchedulerPhaseAnalyzingDoubleWrites, DateTime.UtcNow.Subtract(start));
processRunnable.SetExecutionResult(executionResult);
if (executionResult.Result.IndicatesFailure())
{
// Dependency analysis failure. Even though the pip is already cached, we got a cache converged event, so
// it is safe for other downstream pips to consume the cached result. However, some double writes were found based
// on the configured policy, so we fail the build
return processRunnable.SetPipResult(executionResult);
}
}
}
if (runnablePip.Worker?.IsRemote == true)
{
m_pipTwoPhaseCache.ReportRemoteMetadataAndPathSet(
executionResult.PipCacheDescriptorV2Metadata,
executionResult.TwoPhaseCachingInfo?.CacheEntry.MetadataHash,
executionResult.PathSet,
executionResult.TwoPhaseCachingInfo?.PathSetHash,
executionResult.TwoPhaseCachingInfo?.WeakFingerprint,
executionResult.TwoPhaseCachingInfo?.StrongFingerprint,
isExecution: !executionResult.Converged,
preservePathCasing: processRunnable.Process.PreservePathSetCasing);
}
// Output content is reported here to ensure that it happens both on worker executing PostProcess and
// orchestrator which called worker to execute post process.
start = DateTime.UtcNow;
PipExecutor.ReportExecutionResultOutputContent(
operationContext,
environment,
processRunnable.Pip.SemiStableHash,
executionResult,
processRunnable.Process.RewritePolicy.ImpliesDoubleWriteIsWarning());
LogSubPhaseDuration(operationContext, runnablePip.Pip, SandboxedProcessCounters.SchedulerPhaseReportingOutputContent, DateTime.UtcNow.Subtract(start), $"(num outputs: {executionResult.OutputContent.Length})");
return processRunnable.SetPipResult(executionResult);
}
case PipExecutionStep.HandleResult:
await OnPipCompleted(runnablePip);
return PipExecutionStep.Done;
default:
throw Contract.AssertFailure(I($"Do not know how to run this pip step: '{step}'"));
}
}