in cid/helpers/account_map.py [0:0]
def create(self, name) -> bool:
"""Create account map"""
print(f'\nCreating {name}...')
logger.info(f'Creating account mapping "{name}"...')
try:
if self.accounts:
logger.info('Account information found, skipping autodiscovery')
raise Exception
if not self._AthenaTableName:
# Autodiscover
print('\tautodiscovering...', end='')
logger.info('Autodiscovering metadata table...')
tables = self.athena.list_table_metadata()
tables = [t for t in tables if t.get('TableType') == 'EXTERNAL_TABLE']
tables = [t for t in tables if t.get('Name') in self.defaults.get('MetadataTableNames')]
if not len(tables):
logger.info('Metadata table not found')
print('account metadata not detected')
raise Exception
table = next(iter(tables))
logger.info(f"Detected metadata table {table.get('Name')}")
accounts = table.get('Columns')
field_found = [v.get('Name') for v in accounts]
field_required = list(self.mappings.get(name).get(table.get('Name')).values())
logger.info(f"Detected fields: {field_found}")
logger.info(f"Required fields: {field_required}")
# Check if we have all the required fields
if all(v in field_found for v in field_required):
logger.info('All required fields found')
self._AthenaTableName = table.get('Name')
else:
logger.info('Missing required fields')
if self._AthenaTableName:
# Query path
view_definition = self.athena._resources.get('views').get(name, dict())
view_file = view_definition.get('File')
template = Template(resource_string(view_definition.get('providedBy'), f'data/queries/{view_file}').decode('utf-8'))
# Fill in TPLs
columns_tpl = dict()
parameters = {
'metadata_table_name': self._AthenaTableName,
'cur_table_name': self.cur.tableName
}
columns_tpl.update(**parameters)
for k,v in self.mappings.get(name).get(self._AthenaTableName).items():
logger.info(f'Mapping field {k} to {v}')
columns_tpl.update({k: v})
compiled_query = template.safe_substitute(columns_tpl)
print('compiled view.')
else:
logger.info('Metadata table not found')
print('account metadata not detected')
raise Exception
except:
compiled_query = self.create_account_mapping_sql(name)
finally:
# Execute query
click.echo('\tcreating view...', nl=False)
query_id = self.athena.execute_query(sql_query=compiled_query)
# Get results as list
response = self.athena.get_query_results(query_id)
click.echo('done')