in cid/helpers/cur.py [0:0]
def metadata(self) -> dict:
if not self._metadata:
try:
# Look other tables
tables = self.athena.list_table_metadata()
# Filter tables with type = 'EXTERNAL_TABLE'
tables = [v for v in tables if v.get('TableType') == 'EXTERNAL_TABLE']
# Filter tables having CUR structure
for table in tables.copy():
columns = [c.get('Name') for c in table.get('Columns')]
if not all([c in columns for c in self.requiredColumns]):
tables.remove(table)
if len(tables) == 1:
self._metadata = tables[0]
self._tableName = self._metadata.get('Name')
elif len(tables) > 1:
self._tableName=questionary.select(
"Multiple CUR tables found, please select one",
choices=[v.get('Name') for v in tables]
).ask()
self._metadata = self.athena.get_table_metadata(self._tableName)
except Exception as e:
# For other errors dump the message
print(json.dumps(e, indent=4, sort_keys=True, default=str))
return self._metadata