in 5-app-infra/3-artifact-publish/docker/cdmc/record_manager/Service.py [0:0]
def search_catalog(self):
retention_records = [] # collect all the tables to be processed along with their retention details
scope = datacatalog.SearchCatalogRequest.Scope()
for project in self.projects_in_scope:
print('Info: project ' + project + ' is in scope')
scope.include_project_ids.append(project)
request = datacatalog.SearchCatalogRequest()
request.scope = scope
query = 'tag:' + self.template_project + '.' + self.template_id + '.' + self.retention_period_field
dataset_index = 0
for dataset_in_scope in self.datasets_in_scope:
print('Info: dataset_in_scope: ' + dataset_in_scope)
if dataset_index == 0:
query += ' and parent:' + dataset_in_scope
else:
query += ' or parent:' + dataset_in_scope
dataset_index += 1
print('Info: using query: ' + query)
request.query = query
request.page_size = 1
for result in self.dc_client.search_catalog(request):
if result.integrated_system != types.IntegratedSystem.BIGQUERY:
continue
#print('Info: found linked resource', result.linked_resource)
fqt = result.linked_resource.replace('//bigquery.googleapis.com/projects/', '').replace('/datasets/', '.').replace('/tables/', '.')
project = fqt.split('.')[0]
dataset = fqt.split('.')[1]
table = fqt.split('.')[2]
print('Info: found tagged table: ' + table)
#print('result.linked_resource:', result.linked_resource)
request = datacatalog.LookupEntryRequest()
request.linked_resource=result.linked_resource
entry = self.dc_client.lookup_entry(request)
if entry.bigquery_table_spec.table_source_type != types.TableSourceType.BIGQUERY_TABLE:
continue
#print('entry:', entry)
create_date = entry.source_system_timestamps.create_time.strftime("%Y-%m-%d")
year = int(create_date.split('-')[0])
month = int(create_date.split('-')[1])
day = int(create_date.split('-')[2])
tag_list = self.dc_client.list_tags(parent=entry.name, timeout=120)
for tag in tag_list:
#print('Info: found tag: ', tag)
if tag.template == 'projects/{0}/locations/{1}/tagTemplates/{2}'.format(self.template_project, self.template_region, self.template_id):
if self.retention_period_field in tag.fields:
field = tag.fields[self.retention_period_field]
retention_period_value = field.double_value
if self.expiration_action_field in tag.fields:
field = tag.fields[self.expiration_action_field]
if field.enum_value:
expiration_action_value = field.enum_value.display_name
else:
expiration_action_value = None
if retention_period_value >= -1 and expiration_action_value:
record = {"project": project, "dataset": dataset, "table": table, "year": year, "month": month, "day": day, \
"retention_period": retention_period_value, "expiration_action": expiration_action_value}
retention_records.append(record)
break
return retention_records