in src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java [541:877]
public void run() {
SkyFunctionEnvironment env = null;
try {
NodeEntry nodeEntry =
Preconditions.checkNotNull(graph.get(null, Reason.EVALUATION, skyKey), skyKey);
Preconditions.checkState(nodeEntry.isReady(), "%s %s", skyKey, nodeEntry);
try {
evaluatorContext.getProgressReceiver().stateStarting(skyKey, NodeState.CHECK_DIRTY);
if (maybeHandleDirtyNode(nodeEntry) == DirtyOutcome.ALREADY_PROCESSED) {
return;
}
} finally {
evaluatorContext.getProgressReceiver().stateEnding(skyKey, NodeState.CHECK_DIRTY);
}
ImmutableSet<SkyKey> oldDeps = nodeEntry.getAllRemainingDirtyDirectDeps();
try {
evaluatorContext
.getProgressReceiver()
.stateStarting(skyKey, NodeState.INITIALIZING_ENVIRONMENT);
env =
SkyFunctionEnvironment.create(
skyKey, nodeEntry.getTemporaryDirectDeps(), oldDeps, evaluatorContext);
} catch (UndonePreviouslyRequestedDeps undonePreviouslyRequestedDeps) {
// If a previously requested dep is no longer done, restart this node from scratch.
stateCache.invalidate(skyKey);
restart(skyKey, nodeEntry);
evaluatorContext.getVisitor().enqueueEvaluation(skyKey, determineRestartPriority());
return;
} finally {
evaluatorContext
.getProgressReceiver()
.stateEnding(skyKey, NodeState.INITIALIZING_ENVIRONMENT);
}
SkyFunctionName functionName = skyKey.functionName();
SkyFunction skyFunction =
Preconditions.checkNotNull(
evaluatorContext.getSkyFunctions().get(functionName),
"Unable to find SkyFunction '%s' for node with key %s, %s",
functionName,
skyKey,
nodeEntry);
SkyValue value = null;
long startTimeNanos = BlazeClock.instance().nanoTime();
try {
try {
evaluatorContext.getProgressReceiver().stateStarting(skyKey, NodeState.COMPUTE);
value = skyFunction.compute(skyKey, env);
} finally {
evaluatorContext.getProgressReceiver().stateEnding(skyKey, NodeState.COMPUTE);
long elapsedTimeNanos = BlazeClock.instance().nanoTime() - startTimeNanos;
if (elapsedTimeNanos > 0) {
Profiler.instance()
.logSimpleTaskDuration(
startTimeNanos,
Duration.ofNanos(elapsedTimeNanos),
ProfilerTask.SKYFUNCTION,
skyKey.functionName().getName());
}
}
} catch (final SkyFunctionException builderException) {
stateCache.invalidate(skyKey);
ReifiedSkyFunctionException reifiedBuilderException =
new ReifiedSkyFunctionException(builderException);
// In keep-going mode, we do not let SkyFunctions complete with a thrown error if they
// have missing deps. Instead, we wait until their deps are done and restart the
// SkyFunction, so we can have a definitive error and definitive graph structure, thus
// avoiding non-determinism. It's completely reasonable for SkyFunctions to throw eagerly
// because they do not know if they are in keep-going mode.
if (!evaluatorContext.keepGoing() || !env.valuesMissing()) {
boolean shouldFailFast =
!evaluatorContext.keepGoing() || builderException.isCatastrophic();
if (shouldFailFast) {
// After we commit this error to the graph but before the doMutatingEvaluation call
// completes with the error there is a race-like opportunity for the error to be used,
// either by an in-flight computation or by a future computation.
if (!evaluatorContext.getVisitor().preventNewEvaluations()) {
// This is not the first error encountered, so we ignore it so that we can terminate
// with the first error.
return;
} else {
logger.atWarning().withCause(builderException).log(
"Aborting evaluation while evaluating %s", skyKey);
}
}
if (maybeHandleRegisteringNewlyDiscoveredDepsForDoneEntry(
skyKey, nodeEntry, oldDeps, env, evaluatorContext.keepGoing())) {
// A newly requested dep transitioned from done to dirty before this node finished.
// If shouldFailFast is true, this node won't be signalled by any such newly dirtied
// dep (because new evaluations have been prevented), and this node is responsible for
// throwing the SchedulerException below.
// Otherwise, this node will be signalled again, and so we should return.
if (!shouldFailFast) {
return;
}
}
boolean isTransitivelyTransient =
reifiedBuilderException.isTransient()
|| env.isAnyDirectDepErrorTransitivelyTransient()
|| env.isAnyNewlyRequestedDepErrorTransitivelyTransient();
ErrorInfo errorInfo =
evaluatorContext
.getErrorInfoManager()
.fromException(skyKey, reifiedBuilderException, isTransitivelyTransient);
env.setError(nodeEntry, errorInfo);
Set<SkyKey> rdepsToBubbleUpTo = env.commitAndGetParents(nodeEntry);
if (shouldFailFast) {
evaluatorContext.signalParentsOnAbort(
skyKey, rdepsToBubbleUpTo, nodeEntry.getVersion());
throw SchedulerException.ofError(errorInfo, skyKey, rdepsToBubbleUpTo);
}
evaluatorContext.signalParentsAndEnqueueIfReady(
skyKey, rdepsToBubbleUpTo, nodeEntry.getVersion(), determineRestartPriority());
return;
}
} catch (RuntimeException re) {
// Programmer error (most likely NPE or a failed precondition in a SkyFunction). Output
// some context together with the exception.
String msg = prepareCrashMessage(skyKey, nodeEntry.getInProgressReverseDeps());
RuntimeException ex = new RuntimeException(msg, re);
evaluatorContext.getVisitor().noteCrash(ex);
throw ex;
} finally {
env.doneBuilding();
}
if (maybeHandleRestart(skyKey, nodeEntry, value)) {
stateCache.invalidate(skyKey);
cancelExternalDeps(env);
evaluatorContext.getVisitor().enqueueEvaluation(skyKey, determineRestartPriority());
return;
}
// Helper objects for all the newly requested deps that weren't known to the environment,
// and may contain duplicate elements.
GroupedListHelper<SkyKey> newDirectDeps = env.getNewlyRequestedDeps();
if (value != null) {
stateCache.invalidate(skyKey);
Preconditions.checkState(
!env.valuesMissing(),
"Evaluation of %s returned non-null value but requested dependencies that weren't "
+ "computed yet (one of %s), NodeEntry: %s",
skyKey,
newDirectDeps,
nodeEntry);
try {
evaluatorContext.getProgressReceiver().stateStarting(skyKey, NodeState.COMMIT);
if (maybeHandleRegisteringNewlyDiscoveredDepsForDoneEntry(
skyKey, nodeEntry, oldDeps, env, evaluatorContext.keepGoing())) {
// A newly requested dep transitioned from done to dirty before this node finished.
// This node will be signalled again, and so we should return.
return;
}
env.setValue(value);
Set<SkyKey> reverseDeps = env.commitAndGetParents(nodeEntry);
evaluatorContext.signalParentsAndEnqueueIfReady(
skyKey, reverseDeps, nodeEntry.getVersion(), determineRestartPriority());
} finally {
evaluatorContext.getProgressReceiver().stateEnding(skyKey, NodeState.COMMIT);
}
return;
}
SkyKey childErrorKey = env.getDepErrorKey();
if (childErrorKey != null) {
Preconditions.checkState(
!evaluatorContext.keepGoing(), "%s %s %s", skyKey, nodeEntry, childErrorKey);
// We encountered a child error in noKeepGoing mode, so we want to fail fast. But we first
// need to add the edge between the current node and the child error it requested so that
// error bubbling can occur. Note that this edge will subsequently be removed during graph
// cleaning (since the current node will never be committed to the graph).
NodeEntry childErrorEntry =
Preconditions.checkNotNull(
graph.get(skyKey, Reason.OTHER, childErrorKey),
"skyKey: %s, nodeEntry: %s childErrorKey: %s",
skyKey,
nodeEntry,
childErrorKey);
if (newDirectDeps.contains(childErrorKey)) {
// Add this dep if it was just requested. In certain rare race conditions (see
// MemoizingEvaluatorTest.cachedErrorCausesRestart) this dep may have already been
// requested.
nodeEntry.addTemporaryDirectDeps(GroupedListHelper.create(childErrorKey));
DependencyState childErrorState;
if (oldDeps.contains(childErrorKey)) {
childErrorState = childErrorEntry.checkIfDoneForDirtyReverseDep(skyKey);
} else {
childErrorState = childErrorEntry.addReverseDepAndCheckIfDone(skyKey);
}
if (childErrorState != DependencyState.DONE) {
// The child in error may have transitioned from done to dirty between when this node
// discovered the error and now. Notify the graph inconsistency receiver so that we
// can crash if that's unexpected.
// We don't enqueue the child, even if it returns NEEDS_SCHEDULING, because we are
// about to shut down evaluation.
evaluatorContext
.getGraphInconsistencyReceiver()
.noteInconsistencyAndMaybeThrow(
skyKey,
ImmutableList.of(childErrorKey),
Inconsistency.BUILDING_PARENT_FOUND_UNDONE_CHILD);
}
}
SkyValue childErrorInfoMaybe =
Preconditions.checkNotNull(
env.maybeGetValueFromErrorOrDeps(childErrorKey),
"dep error found but then lost while building: %s %s",
skyKey,
childErrorKey);
ErrorInfo childErrorInfo =
Preconditions.checkNotNull(
ValueWithMetadata.getMaybeErrorInfo(childErrorInfoMaybe),
"dep error found but then wasn't an error while building: %s %s %s",
skyKey,
childErrorKey,
childErrorInfoMaybe);
evaluatorContext.getVisitor().preventNewEvaluations();
// TODO(b/166268889): Remove when fixed.
if (childErrorInfo.getException() instanceof IOException) {
logger.atInfo().withCause(childErrorInfo.getException()).log(
"Child %s with IOException forced abort of %s", childErrorKey, skyKey);
}
throw SchedulerException.ofError(childErrorInfo, childErrorKey, ImmutableSet.of(skyKey));
}
// TODO(bazel-team): This code is not safe to interrupt, because we would lose the state in
// newDirectDeps.
// TODO(bazel-team): An ill-behaved SkyFunction can throw us into an infinite loop where we
// add more dependencies on every run. [skyframe-core]
// Add all the newly requested dependencies to the temporary direct deps. Note that
// newDirectDeps does not contain any elements in common with the already existing temporary
// direct deps. uniqueNewDeps will be the set of unique keys contained in newDirectDeps.
Set<SkyKey> uniqueNewDeps = nodeEntry.addTemporaryDirectDeps(newDirectDeps);
List<ListenableFuture<?>> externalDeps = env.externalDeps;
// If there were no newly requested dependencies, at least one of them was in error or there
// is a bug in the SkyFunction implementation. The environment has collected its errors, so
// we just order it to be built.
if (uniqueNewDeps.isEmpty() && externalDeps == null) {
// TODO(bazel-team): This means a bug in the SkyFunction. What to do?
Preconditions.checkState(
!env.getChildErrorInfos().isEmpty(),
"Evaluation of SkyKey failed and no dependencies were requested: %s %s",
skyKey,
nodeEntry);
Preconditions.checkState(
evaluatorContext.keepGoing(),
"nokeep_going evaluation should have failed on first child error: %s %s %s",
skyKey,
nodeEntry,
env.getChildErrorInfos());
// If the child error was catastrophic, committing this parent to the graph is not
// necessary, but since we don't do error bubbling in catastrophes, it doesn't violate any
// invariants either.
Set<SkyKey> reverseDeps = env.commitAndGetParents(nodeEntry);
evaluatorContext.signalParentsAndEnqueueIfReady(
skyKey, reverseDeps, nodeEntry.getVersion(), determineRestartPriority());
return;
}
// If there are external deps, we register that fact on the NodeEntry before we enqueue
// child nodes in order to prevent the current node from being re-enqueued between here and
// the call to registerExternalDeps below.
if (externalDeps != null) {
nodeEntry.addExternalDep();
}
// We want to split apart the dependencies that existed for this node the last time we did
// an evaluation and those that were introduced in this evaluation. To be clear, the prefix
// "newDeps" refers to newly discovered this time around after a SkyFunction#compute call
// and not to be confused with the oldDeps variable which refers to the last evaluation,
// (ie) a prior call to ParallelEvaluator#eval).
Set<SkyKey> newDepsThatWerentInTheLastEvaluation = Sets.difference(uniqueNewDeps, oldDeps);
Set<SkyKey> newDepsThatWereInTheLastEvaluation =
Sets.difference(uniqueNewDeps, newDepsThatWerentInTheLastEvaluation);
int childEvaluationPriority = determineChildPriority();
InterruptibleSupplier<Map<SkyKey, ? extends NodeEntry>>
newDepsThatWerentInTheLastEvaluationNodes =
graph.createIfAbsentBatchAsync(
skyKey, Reason.RDEP_ADDITION, newDepsThatWerentInTheLastEvaluation);
handleKnownChildrenForDirtyNode(
newDepsThatWereInTheLastEvaluation,
graph.getBatch(skyKey, Reason.ENQUEUING_CHILD, newDepsThatWereInTheLastEvaluation),
nodeEntry,
childEvaluationPriority,
/*enqueueParentIfReady=*/ true);
// Due to multi-threading, this can potentially cause the current node to be re-enqueued if
// all 'new' children of this node are already done. Therefore, there should not be any
// code after this loop, as it would potentially race with the re-evaluation in another
// thread.
for (Map.Entry<SkyKey, ? extends NodeEntry> e :
newDepsThatWerentInTheLastEvaluationNodes.get().entrySet()) {
SkyKey newDirectDep = e.getKey();
NodeEntry newDirectDepEntry = e.getValue();
enqueueChild(
skyKey,
nodeEntry,
newDirectDep,
newDirectDepEntry,
/*depAlreadyExists=*/ false,
childEvaluationPriority,
/*enqueueParentIfReady=*/ true);
}
if (externalDeps != null) {
// This can cause the current node to be re-enqueued if all futures are already done.
// This is an exception to the rule above that there must not be code below the for
// loop. It is safe because we call nodeEntry.addExternalDep above, which prevents
// re-enqueueing of the current node in the above loop if externalDeps != null.
evaluatorContext
.getVisitor()
.registerExternalDeps(skyKey, nodeEntry, externalDeps, determineRestartPriority());
}
// Do not put any code here! Any code here can race with a re-evaluation of this same node
// in another thread.
} catch (InterruptedException ie) {
// The current thread can be interrupted at various places during evaluation or while
// committing the result in this method. Since we only register the future(s) with the
// underlying AbstractQueueVisitor in the registerExternalDeps call above, we have to make
// sure that any known futures are correctly canceled if we do not reach that call. Note
// that it is safe to cancel a future multiple times.
cancelExternalDeps(env);
// InterruptedException cannot be thrown by Runnable.run, so we must wrap it.
// Interrupts can be caught by both the Evaluator and the AbstractQueueVisitor.
// The former will unwrap the IE and propagate it as is; the latter will throw a new IE.
throw SchedulerException.ofInterruption(ie, skyKey);
}
}