in components/processing/libs/processor-base/src/processors/base/gcsio.py [0:0]
def write_folder(self) -> Iterator[str]:
"""Writable local folder that will uploaded if necessary."""
logger.debug("Writing to %s as local folder", str(self))
if not self.bucket:
yield str(self.path)
return
with tempfile.TemporaryDirectory() as d:
logger.debug("Staging in local folder %s", d)
# Return temporary directory
yield d
# Upload objects to GCS
for root, _, files in os.walk(d):
for file in files:
obj_path = str(Path(self.path, Path(root).relative_to(d), file))
logger.debug("Uploading %s to %s", Path(root, file), obj_path)
self.bucket.blob(obj_path).upload_from_filename(
Path(root, file),
content_type=get_mimetype(obj_path),
)