in services/worker/src/worker/job_runners/config/parquet_and_info.py [0:0]
def __enter__(self) -> "track_reads":
tracker = self
# Track files reads from local, http and hf file-systems.
# To do so, we replace LocalFileSystem.open, HTTPFileSystem.open and HfFileSystem.open
# by wrappers that modify the output file read functions with tracked read functions.
def wrapped(
self: Union[LocalFileSystem, HTTPFileSystem, HfFileSystem],
urlpath: str,
mode: str = "rb",
*args: Any,
fs_open: Callable[..., FsspecFile],
**kwargs: Any,
) -> FsspecFile:
f = fs_open(self, urlpath, mode, *args, **kwargs)
urlpath = self.unstrip_protocol(urlpath)
if "w" not in mode:
f.read = functools.partial(tracker.track_read, urlpath, f.read)
f.__iter__ = functools.partial(tracker.track_iter, urlpath, f.__iter__)
if hasattr(f, "read1"):
f.read1 = functools.partial(tracker.track_read, urlpath, f.read1)
if hasattr(f, "readline"):
f.readline = functools.partial(tracker.track_read, urlpath, f.readline)
if hasattr(f, "readlines"):
f.readlines = functools.partial(tracker.track_read, urlpath, f.readlines)
if urlpath not in tracker.files:
tracker.files[urlpath] = {"read": 0, "size": int(f.size)}
return f
# Use an exit_stack to be able to un-do all the replacements once the track_reads context ends.
# Use patch.object to apply the replacement, and autospec=True to handle methods replacements properly.
# Apply the wrapped open function using `side_effect`.
local_open = LocalFileSystem.open
mock_local_open = self.exit_stack.enter_context(patch.object(LocalFileSystem, "open", autospec=True))
mock_local_open.side_effect = functools.partial(wrapped, fs_open=local_open)
http_open = HTTPFileSystem.open
mock_http_open = self.exit_stack.enter_context(patch.object(HTTPFileSystem, "open", autospec=True))
mock_http_open.side_effect = functools.partial(wrapped, fs_open=http_open)
hf_open = HfFileSystem.open
mock_hf_open = self.exit_stack.enter_context(patch.object(HfFileSystem, "open", autospec=True))
mock_hf_open.side_effect = functools.partial(wrapped, fs_open=hf_open)
# always use fsspec even for local paths
self.exit_stack.enter_context(patch("datasets.utils.file_utils.is_local_path", return_value=False))
# zip central directories are read over and over again, let's track it only once
zip_init = ZipFileSystem.__init__
mock_zip_init = self.exit_stack.enter_context(patch.object(ZipFileSystem, "__init__", autospec=True))
mock_zip_init.side_effect = functools.partial(self.track_metadata_read_once, func=zip_init)
return self