in smallpond/io/filesystem.py [0:0]
def dump(obj: Any, path: str, buffering=2 * MB, atomic_write=False) -> int:
"""
Dump an object to a file.
Args:
obj: The object to dump.
path: The path to the file to dump the object to.
buffering: The buffering size.
atomic_write: Whether to atomically write the file.
Returns:
The size of the file.
"""
def get_pickle_trace(obj):
try:
import dill
import dill.detect
except ImportError:
return None, None
pickle_trace = io.StringIO()
pickle_error = None
with dill.detect.trace(pickle_trace):
try:
dill.dumps(obj, recurse=True)
except Exception as ex:
pickle_error = ex
return pickle_trace.getvalue(), pickle_error
def write_to_file(fout):
with zstd.ZstdCompressor().stream_writer(fout, closefd=False) as zstd_writer:
try:
cloudpickle.dump(obj, zstd_writer)
except zstd.ZstdError as ex:
raise
except Exception as ex:
trace_str, trace_err = get_pickle_trace(obj)
logger.opt(exception=ex).error(f"pickle trace of {repr(obj)}:{os.linesep}{trace_str}")
if trace_err is None:
raise
else:
raise trace_err from ex
logger.trace("{} saved to {}", repr(obj), path)
size = 0
if atomic_write:
directory, filename = os.path.split(path)
with tempfile.NamedTemporaryFile("wb", buffering=buffering, dir=directory, prefix=filename, delete=False) as fout:
write_to_file(fout)
fout.seek(0, os.SEEK_END)
size = fout.tell()
os.rename(fout.name, path)
else:
with open(path, "wb", buffering=buffering) as fout:
write_to_file(fout)
fout.seek(0, os.SEEK_END)
size = fout.tell()
if size >= buffering:
logger.debug(f"created a large pickle file ({size/MB:.3f}MB): {path}")
return size