in torchdata/datapipes/iter/util/cacheholder.py [0:0]
def __new__(cls, datapipe, mode="w", filepath_fn=None, *, same_filepath_fn=False, skip_read=False):
if filepath_fn is not None and same_filepath_fn:
raise ValueError("`filepath_fn` is mutually exclusive with `same_filepath_fn`")
graph = traverse(datapipe, exclude_primitive=True)
# Get the last CacheHolder
cache_holder = EndOnDiskCacheHolderIterDataPipe._recursive_search(graph)
if cache_holder is None:
raise RuntimeError("Expected `OnDiskCacheHolder` existing in pipeline when `end_caching` is invoked")
if cache_holder._end_caching_flag:
raise RuntimeError("`end_caching` can only be invoked once per `OnDiskCacheHolder`")
_filepath_fn, _hash_dict, _hash_type, _ = OnDiskCacheHolderIterDataPipe._temp_dict[cache_holder]
cached_dp = cache_holder._end_caching()
if same_filepath_fn:
filepath_fn = _filepath_fn
todo_dp = datapipe
if not skip_read:
if "b" in mode:
todo_dp = todo_dp.map(fn=_read_bytes, input_col=1)
else:
todo_dp = todo_dp.map(fn=_read_str, input_col=1)
if filepath_fn is not None:
todo_dp = todo_dp.map(fn=filepath_fn, input_col=0)
# Extra hash check here when hash is provided.
# And, raise Error if data returned from prior operations doesn't meet hash
if _hash_dict is not None:
todo_dp = todo_dp.check_hash(_hash_dict, _hash_type)
todo_dp = todo_dp.save_to_disk(mode=mode)
return cached_dp.concat(todo_dp)