in blogs/finspace_redshift-2021-09/finspace.py [0:0]
def read_view_as_pandas(self, dataset_id: str, view_id: str):
"""
Returns a pandas dataframe the view of the given dataset. Views in FinSpace can be quite large, be careful!
:param dataset_id:
:param view_id:
:return: Pandas dataframe with all data of the view
"""
import awswrangler as wr # use awswrangler to read the table
# @todo: switch to DescribeMateriliazation when available in HFS
views = self.list_views(dataset_id=dataset_id, max_results=50)
filtered = [v for v in views if v['id'] == view_id]
if len(filtered) == 0:
raise Exception('No such view found')
if len(filtered) > 1:
raise Exception('Internal Server error')
view = filtered[0]
# 0. Ensure view is ready to be read
if (view['status'] != 'SUCCESS'):
status = view['status']
print(f'view run status is not ready: {status}. Returning empty.')
return
glue_db_name = view['destinationTypeProperties']['databaseName']
glue_table_name = view['destinationTypeProperties']['tableName']
# determine if the table has partitions first, different way to read is there are partitions
p = wr.catalog.get_partitions(table=glue_table_name, database=glue_db_name, boto3_session=self._boto3_session)
def no_filter(partitions):
if len(partitions.keys()) > 0:
return True
return False
df = None
if len(p) == 0:
df = wr.s3.read_parquet_table(table=glue_table_name, database=glue_db_name,
boto3_session=self._boto3_session)
else:
spath = wr.catalog.get_table_location(table=glue_table_name, database=glue_db_name,
boto3_session=self._boto3_session)
cpath = wr.s3.list_directories(f"{spath}/*", boto3_session=self._boto3_session)
read_path = f"{spath}/"
# just one? Read it
if len(cpath) == 1:
read_path = cpath[0]
df = wr.s3.read_parquet(read_path, dataset=True, partition_filter=no_filter,
boto3_session=self._boto3_session)
# Query Glue table directly with wrangler
return df