in src/buildstream_plugins/sources/docker.py [0:0]
def stage(self, directory):
mirror_dir = self.get_mirror_directory()
try:
manifest = self._load_manifest()
except (OSError, SourceError) as e:
raise SourceError("Unable to load manifest: {}".format(e)) from e
try:
for layer in manifest["layers"]:
layer_digest = layer["digest"]
blob_path = os.path.join(mirror_dir, layer_digest + ".tar.gz")
self._verify_blob(blob_path, expected_digest=layer_digest)
(
extract_fileset,
white_out_fileset,
) = self._get_extract_and_remove_files(blob_path)
# remove files associated with whiteouts
for white_out_file in white_out_fileset:
white_out_file = os.path.join(directory, white_out_file)
os.remove(white_out_file)
# extract files for the current layer
with tarfile.open(blob_path, tarinfo=ReadableTarInfo) as tar:
with self.tempdir() as td:
if hasattr(tarfile, "tar_filter"):
# Python 3.12+ (and older versions with backports)
tar.extraction_filter = tarfile.tar_filter
else:
for member in extract_fileset:
self._assert_tarinfo_safe(member, td)
tar.extractall(path=td, members=extract_fileset)
link_files(td, directory)
except (OSError, SourceError, tarfile.TarError) as e:
raise SourceError("{}: Error staging source: {}".format(self, e)) from e