in cid/common.py [0:0]
def create_dataset(self, dataset_definition) -> bool:
# Check for required views
_views = dataset_definition.get('dependsOn').get('views')
required_views = [self.cur.tableName if name ==
'${cur_table_name}' else name for name in _views]
self.athena.discover_views(required_views)
found_views = sorted(set(required_views).intersection(
self.athena._metadata.keys()))
missing_views = sorted(
list(set(required_views).difference(found_views)))
# try discovering missing views
self.athena.discover_views(missing_views)
# repeat comparison
found_views = sorted(set(required_views).intersection(
self.athena._metadata.keys()))
missing_views = sorted(
list(set(required_views).difference(found_views)))
# create missing views
if len(missing_views):
print(f'\tmissing Athena views: {missing_views}')
self.create_views(missing_views)
# Read dataset definition from template
dataset_file = dataset_definition.get('File')
if dataset_file:
if not len(self.qs.athena_datasources):
logger.info('No Athena datasources found, attempting to create one')
self.qs.create_data_source()
if not len(self.qs.athena_datasources):
logger.info('No Athena datasources available, failing')
return False
# Load TPL file
columns_tpl = dict()
columns_tpl.update({
'cur_table_name': self.cur.tableName if dataset_definition.get('dependsOn').get('cur') else None,
'athena_datasource_arn': next(iter(self.qs.athena_datasources)),
'athena_database_name': self.athena.DatabaseName,
'user_arn': self.qs.user.get('Arn')
})
template = Template(resource_string(dataset_definition.get(
'providedBy'), f'data/datasets/{dataset_file}').decode('utf-8'))
compiled_dataset = json.loads(template.safe_substitute(columns_tpl))
self.qs.create_dataset(compiled_dataset)
else:
print(f"Error: {dataset_definition.get('Name')} definition is broken")
exit(1)
return True