in 5-app-infra/3-artifact-publish/docker/cdmc/record_manager/Service.py [0:0]
def run_service(self, file_path):
json_input = s.get_param_file(file_path)
if not json_input:
print('Missing json parameters.')
sys.exit()
if 'template_id' not in json_input:
print('Missing template_id parameter.')
sys.exit()
else:
self.template_id = json_input['template_id']
print('template_id: ', self.template_id)
if 'template_project' not in json_input:
print('Missing template_project parameter.')
sys.exit()
else:
self.template_project = json_input['template_project']
if 'template_region' not in json_input:
print('Missing template_region parameter.')
sys.exit()
else:
self.template_region = json_input['template_region']
if 'retention_period_field' not in json_input:
print('Missing retention_period_field parameter.')
sys.exit()
else:
self.retention_period_field = json_input['retention_period_field']
if 'expiration_action_field' not in json_input:
print('Missing expiration_action_field parameter.')
sys.exit()
else:
self.expiration_action_field = json_input['expiration_action_field']
if 'projects_in_scope' not in json_input:
print('Missing projects_in_scope parameter.')
sys.exit()
else:
self.projects_in_scope = json_input['projects_in_scope']
if 'datasets_in_scope' in json_input:
self.datasets_in_scope = json_input['datasets_in_scope']
print('Info: datasets_in_scope set to:', self.datasets_in_scope)
else:
self.datasets_in_scope = []
print('Info: datasets_in_scope is empty.')
if 'bigquery_region' not in json_input:
print('Missing bigquery_region parameter.')
sys.exit()
else:
self.bigquery_region = json_input['bigquery_region']
if 'snapshot_project' not in json_input:
print('Missing snapshot_project parameter.')
sys.exit()
else:
self.snapshot_project = json_input['snapshot_project']
if 'snapshot_dataset' not in json_input:
print('Missing snapshot_dataset parameter.')
sys.exit()
else:
self.snapshot_dataset = json_input['snapshot_dataset']
if 'snapshot_retention_period' not in json_input:
print('Missing snapshot_retention_period parameter.')
sys.exit()
else:
self.snapshot_retention_period = json_input['snapshot_retention_period']
if 'archives_bucket' not in json_input:
print('Missing archives_bucket parameter.')
sys.exit()
else:
self.archives_bucket = json_input['archives_bucket']
if 'export_format' not in json_input:
print('Missing export_format parameter.')
sys.exit()
else:
self.export_format = json_input['export_format']
if 'archives_project' not in json_input:
print('Missing archives_project parameter.')
sys.exit()
else:
self.archives_project = json_input['archives_project']
if 'archives_dataset' not in json_input:
print('Missing archives_dataset parameter.')
sys.exit()
else:
self.archives_dataset = json_input['archives_dataset']
if 'remote_connection' not in json_input:
print('Missing remote_connection parameter.')
sys.exit()
else:
self.remote_connection = json_input['remote_connection']
if 'tag_engine_endpoint' not in json_input:
print('Missing tag_engine_endpoint parameter.')
sys.exit()
else:
self.tag_engine_endpoint = json_input['tag_engine_endpoint']
if 'mode' not in json_input:
print('Missing mode parameter.')
sys.exit()
else:
if json_input['mode'] != 'validate' and json_input['mode'] != 'apply':
print('Invalid mode parameter. Must be equal to "validate" or "apply"')
sys.exit()
self.mode = json_input['mode']
# create clients
self.dc_client = datacatalog.DataCatalogClient()
self.bq_client = bigquery.Client(location=self.bigquery_region)
print('Info: running in', self.mode, 'mode.')
print(f'Info Bearer Token: {self.get_bearer_token()}')
print(f'Info OAuth Token: {self.get_oauth_token()}')
# search catalog
retention_records = s.search_catalog()
print('Info: found retention records in the catalog:', retention_records)
# process purge actions
s.create_snapshots(retention_records)
s.expire_tables(retention_records)
# process archive actions
s.archive_tables(retention_records)