in assets/mwaa_dags/dags/scripts/mwaa_blogpost_data_pipeline.py [0:0]
def download_dataset(**context):
endpoint_path = context['endpoint_path']
bucket_name = context['templates_dict']['bucket_name']
bucket_partition = context['templates_dict']['bucket_partition']
s3_hook = S3Hook(aws_conn_id='aws_default')
movie_lens_data = requests.get(endpoint_path)
if movie_lens_data:
with zipfile.ZipFile(BytesIO(movie_lens_data.content)) as zip_movie_lens_file:
for ziped_file in zip_movie_lens_file.namelist():
if ziped_file.endswith('.csv'):
print(ziped_file)
# unziped_file = BytesIO(zip_movie_lens_file.read(ziped_file))
s3_folder_name = ziped_file.split('/')[-1].rstrip('.csv')
s3_object_name = ziped_file.split('/')[-1]
s3_hook.load_bytes(bucket_name=bucket_name,
key=f'{s3_folder_name}/{bucket_partition}/{s3_object_name}',
bytes_data=zip_movie_lens_file.read(ziped_file), replace=True)
# Write Sucess File
s3_hook.load_string(bucket_name=bucket_name, key='_SUCCESS', string_data='SUCCESS', replace=True)
return True
else:
return False