in components/processing/libs/processor-base/src/processors/base/gcsio.py [0:0]
def copy(self, idest: str | TGCSPath, delete_orig=False):
"""Move current object (file) to a target object (file), optionally deleting original"""
if isinstance(idest, str):
dest = GCSPath(idest)
else:
dest = idest
# If the same, stop now
if str(self) == str(dest):
return
# Copy two GCS objects
def gcs_rewrite(source, dest):
src = source.bucket.blob(source.path)
dst = dest.bucket.blob(dest.path)
token, _, _ = dst.rewrite(source=src)
while token is not None:
token, _, _ = dst.rewrite(source=src, token=token)
# Make local directories if necessary
if not dest.bucket:
os.makedirs(Path(dest.path).parent, exist_ok=True)
if self.bucket:
if dest.bucket:
logger.debug("Copying remotely from %s to %s", str(self), str(dest))
gcs_rewrite(self, dest)
if delete_orig:
self.delete()
else:
# Download from GCS
logger.debug("Downloading from %s to %s", str(self), str(dest))
self.bucket.blob(self.path).download_to_filename(dest.path)
if delete_orig:
self.delete()
else:
if dest.bucket:
# Upload to GCS
logger.debug("Uploading from %s to %s", str(self), str(dest))
dest.bucket.blob(dest.path).upload_from_filename(
str(self),
content_type=get_mimetype(dest.path),
)
if delete_orig:
self.delete()
else:
if delete_orig:
# Move locally
logger.debug("Moving locally from %s to %s", str(self), str(dest))
Path(self.path).rename(dest.path)
else:
# Copy locally
logger.debug("Copying locally from %s to %s", str(self), str(dest))
shutil.copyfile(self.path, dest.path)