in components/dpu-workflow/src/utils/gcs_utils.py [0:0]
def move(self):
source_bucket = BucketRegistry.get_bucket(self.source_doc.bucket_name)
source_blob = source_bucket.blob(self.source_doc.blob_name)
destination_bucket = BucketRegistry.get_bucket(self.dest_doc.bucket_name)
source_bucket.copy_blob(
source_blob, destination_bucket, self.dest_doc.blob_name
)
if self.move_info:
destination_bucket.blob(
f"{self.dest_doc.blob_name}.json"
).upload_from_string(self.move_info, content_type="application/json")
source_bucket.delete_blob(self.source_doc.blob_name)
logging.info(
f"Moved {self.source_doc.bucket_name}/{self.source_doc.blob_name} "
f"to {self.dest_doc.bucket_name}/{self.dest_doc.blob_name}"
)