in mwaairflow/assets/plugins/operators/azure_blob_list_to_s3.py [0:0]
def execute(self, context: dict) -> str:
azure_hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
s3_hook = S3Hook(aws_conn_id=self.aws_conn_id)
print("Listing blob from: %s", self.blob_list_path_file)
s3_list = []
with open(self.blob_list_path_file, "r") as f:
for blob in f:
blob = blob.rstrip("\n")
blob_name = blob.split(",")[0]
s3_object_key_no_prefix = blob.split(",")[1]
blob_size = int(blob.split(",")[2])
print("blob_name: %s", blob_name)
with tempfile.NamedTemporaryFile() as temp_file:
self.log.info(
"Downloading data from container: %s and blob: %s on temp_file: %s",
self.container_name,
blob_name,
temp_file.name,
)
azure_hook.get_file(
file_path=temp_file.name,
container_name=self.container_name,
blob_name=blob_name,
)
s3_object_key = self.s3_prefix + s3_object_key_no_prefix
upload_or_replace = False
if s3_hook.check_for_key(
key=s3_object_key, bucket_name=self.bucket_name
):
s3_object_size = s3_hook.get_key(
key=s3_object_key, bucket_name=self.bucket_name
).content_length
self.log.info(
"Object exists on s3 on bucket_name: %s and key: %s with size: %s ",
self.bucket_name,
s3_object_key,
s3_object_size,
)
if blob_size != s3_object_size:
upload_or_replace = True
self.log.info(
"Object does not have the same size on Amazon S3 than on Azure Blob Storage."
)
else:
self.log.info(
"Object has the same size on Amazon S3 than on Azure Blob Storage. Upload to Amazon S3 will be discarded"
)
else:
self.log.info(
"Object doesn't exists on s3 on bucket_name: %s and key: %s ",
self.bucket_name,
s3_object_key,
)
upload_or_replace = True
if upload_or_replace:
self.log.info(
"Uploading data from blob's: %s into Amazon S3 bucket: %s",
s3_object_key,
self.bucket_name,
)
s3_hook.load_file(
filename=temp_file.name,
key=s3_object_key,
bucket_name=self.bucket_name,
replace=self.replace,
encrypt=self.encrypt,
gzip=self.gzip,
acl_policy=self.acl_policy,
)
self.log.info(
"Resources have been uploaded from blob: %s to Amazon S3 bucket:%s",
s3_object_key,
self.bucket_name,
)
s3_list.append(f"s3://{self.bucket_name}/{s3_object_key}")
return s3_list