def execute()

in mwaairflow/assets/plugins/operators/azure_blob_list_to_s3.py [0:0]


    def execute(self, context: dict) -> str:
        azure_hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id)
        print("Listing blob from: %s", self.blob_list_path_file)

        s3_list = []

        with open(self.blob_list_path_file, "r") as f:
            for blob in f:
                blob = blob.rstrip("\n")
                blob_name = blob.split(",")[0]
                s3_object_key_no_prefix = blob.split(",")[1]
                blob_size = int(blob.split(",")[2])

                print("blob_name: %s", blob_name)

                with tempfile.NamedTemporaryFile() as temp_file:
                    self.log.info(
                        "Downloading data from container: %s and blob: %s on temp_file: %s",
                        self.container_name,
                        blob_name,
                        temp_file.name,
                    )

                    azure_hook.get_file(
                        file_path=temp_file.name,
                        container_name=self.container_name,
                        blob_name=blob_name,
                    )

                    s3_object_key = self.s3_prefix + s3_object_key_no_prefix

                    upload_or_replace = False

                    if s3_hook.check_for_key(
                        key=s3_object_key, bucket_name=self.bucket_name
                    ):
                        s3_object_size = s3_hook.get_key(
                            key=s3_object_key, bucket_name=self.bucket_name
                        ).content_length
                        self.log.info(
                            "Object exists on s3 on bucket_name: %s and key: %s with size: %s ",
                            self.bucket_name,
                            s3_object_key,
                            s3_object_size,
                        )

                        if blob_size != s3_object_size:
                            upload_or_replace = True
                            self.log.info(
                                "Object does not have the same size on Amazon S3 than on Azure Blob Storage."
                            )
                        else:
                            self.log.info(
                                "Object has the same size on Amazon S3 than on Azure Blob Storage. Upload to Amazon S3 will be discarded"
                            )
                    else:
                        self.log.info(
                            "Object doesn't exists on s3 on bucket_name: %s and key: %s ",
                            self.bucket_name,
                            s3_object_key,
                        )
                        upload_or_replace = True

                    if upload_or_replace:
                        self.log.info(
                            "Uploading data from blob's: %s into Amazon S3 bucket: %s",
                            s3_object_key,
                            self.bucket_name,
                        )
                        s3_hook.load_file(
                            filename=temp_file.name,
                            key=s3_object_key,
                            bucket_name=self.bucket_name,
                            replace=self.replace,
                            encrypt=self.encrypt,
                            gzip=self.gzip,
                            acl_policy=self.acl_policy,
                        )
                        self.log.info(
                            "Resources have been uploaded from blob: %s to Amazon S3 bucket:%s",
                            s3_object_key,
                            self.bucket_name,
                        )
                        s3_list.append(f"s3://{self.bucket_name}/{s3_object_key}")
        return s3_list