in src/buildstream/sandbox/_sandboxremote.py [0:0]
def _execute_action(self, action, flags):
stdout, stderr = self._get_output()
context = self._get_context()
project = self._get_project()
cascache = context.get_cascache()
artifactcache = context.artifactcache
action_digest = cascache.add_object(buffer=action.SerializeToString())
casremote = self.storage_remote
# check action cache download and download if there
action_result = self._check_action_cache(action_digest)
if not action_result:
with self._get_context().messenger.timed_activity(
"Uploading input root", element_name=self._get_element_name()
):
# Determine blobs missing on remote
try:
input_root_digest = action.input_root_digest
missing_blobs = list(cascache.missing_blobs_for_directory(input_root_digest, remote=casremote))
except grpc.RpcError as e:
raise SandboxError("Failed to determine missing blobs: {}".format(e)) from e
# Check if any blobs are also missing locally (partial artifact)
# and pull them from the artifact cache.
try:
local_missing_blobs = cascache.missing_blobs(missing_blobs)
if local_missing_blobs:
artifactcache.fetch_missing_blobs(project, local_missing_blobs)
except (grpc.RpcError, BstError) as e:
raise SandboxError("Failed to pull missing blobs from artifact cache: {}".format(e)) from e
# Add command and action messages to blob list to push
missing_blobs.append(action.command_digest)
missing_blobs.append(action_digest)
# Now, push the missing blobs to the remote.
try:
cascache.send_blobs(casremote, missing_blobs)
except grpc.RpcError as e:
raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
# Now request to execute the action
operation = self.run_remote_command(action_digest)
action_result = self._extract_action_result(operation)
# Fetch outputs
for output_directory in action_result.output_directories:
# Now do a pull to ensure we have the full directory structure.
# We first try the root_directory_digest we requested, then fall back to tree_digest
root_directory_digest = output_directory.root_directory_digest
if root_directory_digest and root_directory_digest.hash:
cascache.fetch_directory(casremote, root_directory_digest)
continue
tree_digest = output_directory.tree_digest
if tree_digest and tree_digest.hash:
cascache.pull_tree(casremote, tree_digest)
continue
raise SandboxError("Output directory structure had no digest attached.")
# Fetch stdout and stderr blobs
cascache.fetch_blobs(casremote, [action_result.stdout_digest, action_result.stderr_digest])
# Forward remote stdout and stderr
if stdout:
if action_result.stdout_digest.hash:
with cascache.open(action_result.stdout_digest, "r") as f:
shutil.copyfileobj(f, stdout)
elif action_result.stdout_raw:
stdout.write(str(action_result.stdout_raw, "utf-8", errors="ignore"))
if stderr:
if action_result.stderr_digest.hash:
with cascache.open(action_result.stderr_digest, "r") as f:
shutil.copyfileobj(f, stderr)
elif action_result.stderr_raw:
stderr.write(str(action_result.stderr_raw, "utf-8", errors="ignore"))
return action_result