def _execute_action()

in src/buildstream/sandbox/_sandboxremote.py [0:0]


    def _execute_action(self, action, flags):
        stdout, stderr = self._get_output()

        context = self._get_context()
        project = self._get_project()
        cascache = context.get_cascache()
        artifactcache = context.artifactcache

        action_digest = cascache.add_object(buffer=action.SerializeToString())

        casremote = self.storage_remote

        # check action cache download and download if there
        action_result = self._check_action_cache(action_digest)

        if not action_result:
            with self._get_context().messenger.timed_activity(
                "Uploading input root", element_name=self._get_element_name()
            ):
                # Determine blobs missing on remote
                try:
                    input_root_digest = action.input_root_digest
                    missing_blobs = list(cascache.missing_blobs_for_directory(input_root_digest, remote=casremote))
                except grpc.RpcError as e:
                    raise SandboxError("Failed to determine missing blobs: {}".format(e)) from e

                # Check if any blobs are also missing locally (partial artifact)
                # and pull them from the artifact cache.
                try:
                    local_missing_blobs = cascache.missing_blobs(missing_blobs)
                    if local_missing_blobs:
                        artifactcache.fetch_missing_blobs(project, local_missing_blobs)
                except (grpc.RpcError, BstError) as e:
                    raise SandboxError("Failed to pull missing blobs from artifact cache: {}".format(e)) from e

                # Add command and action messages to blob list to push
                missing_blobs.append(action.command_digest)
                missing_blobs.append(action_digest)

                # Now, push the missing blobs to the remote.
                try:
                    cascache.send_blobs(casremote, missing_blobs)
                except grpc.RpcError as e:
                    raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e

            # Now request to execute the action
            operation = self.run_remote_command(action_digest)
            action_result = self._extract_action_result(operation)

        # Fetch outputs
        for output_directory in action_result.output_directories:
            # Now do a pull to ensure we have the full directory structure.
            # We first try the root_directory_digest we requested, then fall back to tree_digest

            root_directory_digest = output_directory.root_directory_digest
            if root_directory_digest and root_directory_digest.hash:
                cascache.fetch_directory(casremote, root_directory_digest)
                continue

            tree_digest = output_directory.tree_digest
            if tree_digest and tree_digest.hash:
                cascache.pull_tree(casremote, tree_digest)
                continue

            raise SandboxError("Output directory structure had no digest attached.")

        # Fetch stdout and stderr blobs
        cascache.fetch_blobs(casremote, [action_result.stdout_digest, action_result.stderr_digest])

        # Forward remote stdout and stderr
        if stdout:
            if action_result.stdout_digest.hash:
                with cascache.open(action_result.stdout_digest, "r") as f:
                    shutil.copyfileobj(f, stdout)
            elif action_result.stdout_raw:
                stdout.write(str(action_result.stdout_raw, "utf-8", errors="ignore"))
        if stderr:
            if action_result.stderr_digest.hash:
                with cascache.open(action_result.stderr_digest, "r") as f:
                    shutil.copyfileobj(f, stderr)
            elif action_result.stderr_raw:
                stderr.write(str(action_result.stderr_raw, "utf-8", errors="ignore"))

        return action_result