in Public/Src/Engine/Scheduler/PipExecutor.cs [1584:1998]
private static async Task<SandboxedProcessPipExecutionResult> ExecutePipAndHandleRetryAsync(ProcessResourceManager.ResourceScope resourceScope,
OperationContext operationContext,
Process pip,
ProcessMemoryCounters expectedMemoryCounters,
IPipExecutionEnvironment environment,
PipExecutionState.PipScopeState state,
Action<int> processIdListener,
IDetoursEventListener detoursEventListener,
ProcessRunLocation runLocation,
DateTime start)
{
var context = environment.Context;
var counters = environment.Counters;
var configuration = environment.Configuration;
var pathTable = context.PathTable;
string processDescription = pip.GetDescription(context);
// When preserving outputs, we need to make sure to remove any hardlinks to the cache.
Func<string, Task<bool>> makeOutputPrivate =
async path =>
{
try
{
if (!FileUtilities.FileExistsNoFollow(path))
{
// Output file doesn't exist. No need to make it private,
// but return false so BuildXL ensures the output directory is created.
return false;
}
if (FileUtilities.GetHardLinkCount(path) == 1 &&
FileUtilities.HasWritableAccessControl(path))
{
// Output file is already private. File will not be deleted.
return true;
}
// We want to use a temp filename that's as short as the original filename.
// To achieve this, we use the random filename generator from System.IO
var maybePrivate = await FileUtilities.TryMakeExclusiveLinkAsync(
path,
optionalTemporaryFileName: Path.GetRandomFileName(),
preserveOriginalTimestamp: true);
if (!maybePrivate.Succeeded)
{
maybePrivate.Failure.Throw();
}
return true;
}
catch (BuildXLException ex)
{
Logger.Log.PreserveOutputsFailedToMakeOutputPrivate(
operationContext,
processDescription,
path,
ex.GetLogEventMessage());
return false;
}
};
// To do in-place rewrites, we need to make writable, private copies of inputs to be rewritten (they may be read-only hardlinks into the cache, for example).
Func<FileArtifact, Task<bool>> makeInputPrivate =
async artifactNeededPrivate =>
{
FileMaterializationInfo inputMaterializationInfo =
environment.State.FileContentManager.GetInputContent(artifactNeededPrivate);
if (inputMaterializationInfo.ReparsePointInfo.IsActionableReparsePoint)
{
// Do nothing in case of re-writing a symlink --- a process can safely change
// symlink's target since it won't affect things in CAS.
return true;
}
ContentHash artifactHash = inputMaterializationInfo.Hash;
// Source files aren't guaranteed in cache, until we first have a reason to ingress them.
// Note that this is only relevant for source files rewritten in place, which is only
// used in some team-internal trace-conversion scenarios as of writing.
if (artifactNeededPrivate.IsSourceFile)
{
// We assume that source files cannot be made read-only so we use copy file materialization
// rather than ever hardlinking
var maybeStored = await environment.LocalDiskContentStore.TryStoreAsync(
environment.Cache.ArtifactContentCache,
fileRealizationModes: FileRealizationMode.Copy,
path: artifactNeededPrivate.Path,
tryFlushPageCacheToFileSystem: false,
knownContentHash: artifactHash,
// Source should have been tracked by hash-source file pip, no need to retrack.
trackPath: false,
isReparsePoint: false);
if (!maybeStored.Succeeded)
{
Logger.Log.StorageCacheIngressFallbackContentToMakePrivateError(
operationContext,
processDescription,
contentHash: artifactHash.ToHex(),
fallbackPath:
artifactNeededPrivate.Path.ToString(pathTable),
errorMessage: maybeStored.Failure.DescribeIncludingInnerFailures());
return false;
}
}
// We need a private version of the output - it must be writable and have link count 1.
// We can achieve that property by forcing a copy of the content (by hash) out of cache.
// The content should be in the cache in usual cases. See special case above for source-file rewriting
// (should not be common; only used in some trace-conversion scenarios as of writing).
var maybeMadeWritable =
await
environment.LocalDiskContentStore
.TryMaterializeTransientWritableCopyAsync(
environment.Cache.ArtifactContentCache,
artifactNeededPrivate.Path,
artifactHash,
environment.Context.CancellationToken);
if (!maybeMadeWritable.Succeeded)
{
Logger.Log.StorageCacheGetContentError(
operationContext,
pip.GetDescription(context),
contentHash: artifactHash.ToHex(),
destinationPath:
artifactNeededPrivate.Path.ToString(pathTable),
errorMessage:
maybeMadeWritable.Failure.DescribeIncludingInnerFailures());
return false;
}
return true;
};
SemanticPathExpander semanticPathExpander = state.PathExpander;
var processMonitoringLogger = new ProcessExecutionMonitoringLogger(operationContext, pip, context, environment.State.ExecutionLog);
// Inner cancellation token source for tracking cancellation time
using (var innerResourceLimitCancellationTokenSource = new CancellationTokenSource())
using (var linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(innerResourceLimitCancellationTokenSource.Token, environment.SchedulerCancellationToken))
using (var counter = operationContext.StartOperation(PipExecutorCounter.ProcessPossibleRetryWallClockDuration))
{
ProcessMemoryCountersSnapshot lastObservedMemoryCounters = default(ProcessMemoryCountersSnapshot);
TimeSpan? cancellationStartTime = null;
using var cancellationTokenRegistration = resourceScope.Token.Register(
() =>
{
cancellationStartTime = TimestampUtilities.Timestamp;
Logger.Log.StartCancellingProcessPipExecutionDueToResourceExhaustion(
operationContext,
processDescription,
resourceScope.CancellationReason?.ToString() ?? "",
resourceScope.ScopeId,
(long)(counter.Duration?.TotalMilliseconds ?? -1),
expectedMemoryCounters.PeakWorkingSetMb,
lastObservedMemoryCounters.PeakWorkingSetMb,
lastObservedMemoryCounters.LastWorkingSetMb,
lastObservedMemoryCounters.LastCommitSizeMb);
using (operationContext.StartAsyncOperation(PipExecutorCounter.ResourceLimitCancelProcessDuration))
{
innerResourceLimitCancellationTokenSource.Cancel();
}
});
IReadOnlyList<AbsolutePath> changeAffectedInputs = pip.ChangeAffectedInputListWrittenFile.IsValid
? environment.State.FileContentManager.SourceChangeAffectedInputs.GetChangeAffectedInputs(pip)
: null;
int remainingUserRetries = pip.RetryExitCodes.Length > 0 ? pip.ProcessRetries : 0;
int remainingInternalSandboxedProcessExecutionFailureRetries = InternalSandboxedProcessExecutionFailureRetryCountMax;
bool firstAttempt = true;
bool userRetry = false;
SandboxedProcessPipExecutionResult result;
var aggregatePipProperties = new Dictionary<string, int>();
IReadOnlyDictionary<AbsolutePath, IReadOnlyCollection<FileArtifactWithAttributes>> staleDynamicOutputs = null;
// Retry pip count up to limit if we produce result without detecting file access.
// There are very rare cases where a child process is started not Detoured and we don't observe any file accesses from such process.
while (true)
{
lastObservedMemoryCounters = default(ProcessMemoryCountersSnapshot);
var executor = new SandboxedProcessPipExecutor(
context,
operationContext.LoggingContext,
pip,
configuration,
environment.RootMappings,
environment.ProcessInContainerManager,
state.FileAccessAllowlist,
makeInputPrivate,
makeOutputPrivate,
semanticPathExpander,
sidebandState: environment.State.SidebandState,
pipEnvironment: environment.State.PipEnvironment,
directoryArtifactContext: new DirectoryArtifactContext(environment),
logger: processMonitoringLogger,
processIdListener: processIdListener,
pipDataRenderer: environment.PipFragmentRenderer,
buildEngineDirectory: configuration.Layout.BuildEngineDirectory,
directoryTranslator: environment.DirectoryTranslator,
remainingUserRetryCount: remainingUserRetries,
vmInitializer: environment.VmInitializer,
remoteProcessManager: environment.RemoteProcessManager,
tempDirectoryCleaner: environment.TempCleaner,
changeAffectedInputs: changeAffectedInputs,
detoursListener: detoursEventListener,
reparsePointResolver: environment.ReparsePointAccessResolver,
staleOutputsUnderSharedOpaqueDirectories: staleDynamicOutputs,
pluginManager: environment.PluginManager,
pipGraphFileSystemView: environment.PipGraphView,
runLocation: runLocation);
resourceScope.RegisterQueryRamUsageMb(
() =>
{
using (counters[PipExecutorCounter.QueryRamUsageDuration].Start())
{
lastObservedMemoryCounters = executor.GetMemoryCountersSnapshot() ?? default(ProcessMemoryCountersSnapshot);
return lastObservedMemoryCounters;
}
});
resourceScope.RegisterEmptyWorkingSet(
(bool isSuspend) =>
{
using (counters[PipExecutorCounter.EmptyWorkingSetDuration].Start())
{
var result = executor.TryEmptyWorkingSet(isSuspend);
if (result == EmptyWorkingSetResult.Success)
{
counters.IncrementCounter(PipExecutorCounter.EmptyWorkingSetSucceeded);
if (resourceScope.SuspendedDurationMs > 0)
{
counters.IncrementCounter(PipExecutorCounter.EmptyWorkingSetSucceededMoreThanOnce);
}
}
return result;
}
});
resourceScope.RegisterResumeProcess(
() =>
{
using (counters[PipExecutorCounter.ResumeProcessDuration].Start())
{
return executor.TryResumeProcess();
}
});
if (firstAttempt)
{
counters.IncrementCounter(PipExecutorCounter.ExternalProcessCount);
environment.SetMaxExternalProcessRan();
firstAttempt = false;
}
using (var sidebandWriter = CreateSidebandWriterIfNeeded(environment, pip))
{
staleDynamicOutputs = null;
start = DateTime.UtcNow;
result = await executor.RunAsync(
linkedCancellationTokenSource.Token,
sandboxConnection: environment.SandboxConnection,
sidebandWriter: sidebandWriter,
fileSystemView: pip.AllowUndeclaredSourceReads ? environment.State.FileSystemView : null);
LogSubPhaseDuration(operationContext, pip, SandboxedProcessCounters.PipExecutorPhaseRunningPip, DateTime.UtcNow.Subtract(start));
staleDynamicOutputs = result.SharedDynamicDirectoryWriteAccesses;
}
if (result.PipProperties != null)
{
foreach (var kvp in result.PipProperties)
{
if (aggregatePipProperties.TryGetValue(kvp.Key, out var value))
{
aggregatePipProperties[kvp.Key] = value + kvp.Value;
}
else
{
aggregatePipProperties.Add(kvp.Key, kvp.Value);
}
}
}
lock (s_telemetryDetoursHeapLock)
{
if (counters.GetCounterValue(PipExecutorCounter.MaxDetoursHeapInBytes) <
result.MaxDetoursHeapSizeInBytes)
{
// Zero out the counter first and then set the new value.
counters.AddToCounter(
PipExecutorCounter.MaxDetoursHeapInBytes,
-counters.GetCounterValue(PipExecutorCounter.MaxDetoursHeapInBytes));
counters.AddToCounter(
PipExecutorCounter.MaxDetoursHeapInBytes,
result.MaxDetoursHeapSizeInBytes);
}
}
if (result.RetryInfo?.RetryMode == RetryMode.Reschedule)
{
Logger.Log.PipProcessToBeRetriedByReschedule(operationContext,
processDescription, result.RetryInfo.RetryReason.ToString());
}
if (result.RetryInfo?.RetryReason == RetryReason.UserSpecifiedExitCode)
{
Contract.Assert(remainingUserRetries > 0);
--remainingUserRetries;
LogUserSpecifiedExitCodeEvent(result, operationContext, context, pip, processDescription, remainingUserRetries);
userRetry = true;
counters.AddToCounter(PipExecutorCounter.RetriedUserExecutionDuration, result.PrimaryProcessTimes.TotalWallClockTime);
counters.IncrementCounter(PipExecutorCounter.ProcessUserRetries);
continue;
}
if (result.RetryInfo.CanBeRetriedInlineOrFalseIfNull())
{
if (remainingInternalSandboxedProcessExecutionFailureRetries <= 0)
{
if (result.RetryInfo.RetryMode == RetryMode.Inline)
{
// Log errors for inline retry on the same worker which have reached their local retry limit
LogRetryInlineErrors(result.RetryInfo.RetryReason, operationContext, pip, processDescription);
break;
}
else // Case: RetryLocation.Both
{
Logger.Log.PipProcessToBeRetriedByReschedule(operationContext, processDescription, result.RetryInfo.RetryReason.ToString());
break;
}
}
else
{
if (EngineEnvironmentSettings.DisableDetoursRetries && result.RetryInfo.RetryReason.IsDetoursRetrableFailure())
{
Logger.Log.DisabledDetoursRetry(operationContext, pip.SemiStableHash, processDescription, result.RetryInfo.RetryReason.ToString());
break;
}
--remainingInternalSandboxedProcessExecutionFailureRetries;
Logger.Log.PipProcessRetriedInline(operationContext,
InternalSandboxedProcessExecutionFailureRetryCountMax - remainingInternalSandboxedProcessExecutionFailureRetries,
InternalSandboxedProcessExecutionFailureRetryCountMax,
processDescription, result.RetryInfo.RetryReason.ToString());
counters.AddToCounter(PipExecutorCounter.RetriedInternalExecutionDuration, result.PrimaryProcessTimes.TotalWallClockTime);
if (!IncrementInternalErrorRetryCounters(result.RetryInfo.RetryReason, counters))
{
Contract.Assert(false, "Unexpected result error type.");
}
continue;
}
// Just break the loop below. The result is already set properly.
}
break;
}
counters.DecrementCounter(PipExecutorCounter.ExternalProcessCount);
result.SuspendedDurationMs = resourceScope.SuspendedDurationMs;
if (result.Status == SandboxedProcessPipExecutionStatus.Canceled && resourceScope.CancellationReason.HasValue)
{
result.RetryInfo = RetryInfo.GetDefault(RetryReason.ResourceExhaustion);
counters.IncrementCounter(resourceScope.CancellationReason == ProcessResourceManager.ResourceScopeCancellationReason.ResourceLimits ?
PipExecutorCounter.ProcessRetriesDueToResourceLimits :
PipExecutorCounter.ProcessRetriesDueToSuspendOrResumeFailure);
TimeSpan? cancelTime = TimestampUtilities.Timestamp - cancellationStartTime;
Logger.Log.CancellingProcessPipExecutionDueToResourceExhaustion(
operationContext,
processDescription,
resourceScope.CancellationReason.ToString(),
(long)(operationContext.Duration?.TotalMilliseconds ?? -1),
peakMemoryMb: result.JobAccountingInformation?.MemoryCounters.PeakWorkingSetMb ?? 0,
expectedMemoryMb: expectedMemoryCounters.PeakWorkingSetMb,
peakCommitMb: result.JobAccountingInformation?.MemoryCounters.PeakCommitSizeMb ?? 0,
expectedCommitMb: expectedMemoryCounters.PeakCommitSizeMb,
cancelMilliseconds: (int)(cancelTime?.TotalMilliseconds ?? 0));
}
if (userRetry)
{
counters.IncrementCounter(PipExecutorCounter.ProcessUserRetriesImpactedPipsCount);
result.HadUserRetries = true;
}
if (aggregatePipProperties.Count > 0)
{
result.PipProperties = aggregatePipProperties;
}
return result;
}
}