in dataflux_pytorch/lightning/gcs_filesystem.py [0:0]
def create_stream(self, path: Union[str, os.PathLike],
mode: str) -> Generator[io.IOBase, None, None]:
bucket, path = parse_gcs_path(path)
blob = self.storage_client.bucket(bucket).blob(path)
if mode == "wb": # write mode.
if self.debug:
print(
f"Creating Stream, Write Mode: Rank: {dist.get_rank()} Bucket: {bucket} path: {path}"
)
with DatafluxCheckpointBuffer(blob) as stream:
yield stream
elif mode == "rb": # read mode.
if self.debug:
print(
f"Creating Stream, Read Mode: Rank: {dist.get_rank()} Bucket: {bucket} path: {path}"
)
stream = io.BytesIO()
blob.download_to_file(stream)
stream.seek(0)
yield stream
else:
raise ValueError(
"Invalid mode argument, create_stream only supports rb (read mode) & wb (write mode)"
)