Resources.py (192 lines of code) (raw):

# Copyright 2020-2023 Google, LLC. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import google.auth from google.api_core.client_info import ClientInfo from google.cloud import bigquery from google.cloud import storage from google.cloud import resourcemanager_v3 import constants, configparser USER_AGENT = 'cloud-solutions/datacatalog-tag-engine-v2' class Resources: bigquery_resource = "bigquery" pubsub_resource = "pubsub" gcs_resource = "gs:" def __init__(self, credentials): self.bq_client = bigquery.Client(credentials=credentials, client_info=ClientInfo(user_agent=USER_AGENT)) self.gcs_client = storage.Client(credentials=credentials, client_info=ClientInfo(user_agent=USER_AGENT)) def get_resources(self, included_uris, excluded_uris): #print('enter get_resources()') #print('included_uris: ' + included_uris) # find out what kind of resource we have included_uris_list = included_uris.split(',') resource_type = included_uris_list[0].strip().split('/')[0] #print("resource_type: " + resource_type) if resource_type == self.bigquery_resource: included_resources = self.find_bq_resources(included_uris) #print("included_resources: " + str(included_resources)) if excluded_uris is None or excluded_uris == "" or excluded_uris.isspace(): return included_resources else: #print("excluded_uris: " + excluded_uris) excluded_resources = self.find_bq_resources(excluded_uris) #print("excluded_resources: " + str(excluded_resources)) elif resource_type == self.gcs_resource: included_resources = self.find_gcs_resources(included_uris) if excluded_uris is None or excluded_uris == "" or excluded_uris.isspace(): return included_resources else: #print("excluded_uris: " + excluded_uris) excluded_resources = self.find_gcs_resources(excluded_uris) #print("excluded_resources: " + str(excluded_resources)) else: print('Error: expected to get a bigquery or gcs resource type, but found this: ' + resource_type) return None remaining_resources = included_resources.difference(excluded_resources) return remaining_resources def get_resources_by_project(self, projects): print('projects:', projects) uris = [] for project in projects: print('project:', project) datasets = list(self.bq_client.list_datasets(project=project)) for dataset in datasets: print('dataset:', dataset.dataset_id) formatted_dataset = self.format_dataset_resource(project + '.' + dataset.dataset_id) uris.append(formatted_dataset) tables = self.bq_client.list_tables(project + '.' + dataset.dataset_id) for table in tables: formatted_table = self.format_table_resource(table.full_table_id) uris.append(formatted_table) return uris def get_resources_by_folder(self, folder): if folder.replace('folders/', '').isnumeric() == False: print('Error: The folder parameter must be a numeric value') return if 'folders/' not in folder: folder = 'folders/' + folder rm_client = resourcemanager_v3.ProjectsClient() request = resourcemanager_v3.ListProjectsRequest( parent=folder, ) resp = rm_client.list_projects(request=request) projects = [] for project in resp: projects.append(project.project_id) uris = self.get_resources_by_project(projects) return uris def format_table_resource(self, table_resource): # BQ table format: project:dataset.table # DC expected resource format: project_id + '/datasets/' + dataset + '/tables/' + short_table formatted = table_resource.replace(":", "/datasets/").replace(".", "/tables/") #print("formatted: " + table_resource) return formatted def format_dataset_resource(self, dataset_resource): # BQ table format: project:dataset.table # DC expected resource format: project_id + '/datasets/' + dataset + '/tables/' + short_table formatted = dataset_resource.replace(".", "/datasets/") #print("formatted: " + table_resource) return formatted def get_datasets(self, dataset): dataset_list = [] if dataset.endswith("*"): datasets = list(self.bq_client.list_datasets()) for ds in datasets: if dataset[:-1] in ds.dataset_id: dataset_list.append(ds.dataset_id) else: dataset_list.append(dataset) return dataset_list def find_bq_resources(self, uris): # @input uris: comma-separated list of uri representing a BQ resource # BQ resources are specified as: # bigquery/project/<project>/dataset/<dataset>/<table> # wildcards are allowed in the table and dataset components of the uri resources = set() table_resources = set() column_resources = set() uri_list = uris.split(",") for uri in uri_list: print("uri: " + uri) split_path = uri.strip().split("/") if split_path[1] != "project": print("Error: invalid URI " + path) return None project_id = split_path[2] path_length = len(split_path) #print("path_length: " + str(path_length)) if path_length == 4: print('uri ' + uri + ' is at the project level') datasets = list(self.bq_client.list_datasets(project=project_id)) for dataset in datasets: tables = list(self.bq_client.list_tables(dataset.dataset_id)) for table in tables: table_resources.add(table.full_table_id) tag_type = constants.BQ_TABLE_TAG if path_length > 4: dataset = split_path[4] dataset_list = self.get_datasets(dataset) for dataset_name in dataset_list: dataset_id = project_id + "." + dataset_name print("path_length: ", path_length) print("dataset_id: " + dataset_id) if path_length == 5: tag_type = constants.BQ_DATASET_TAG dataset_resource = self.format_dataset_resource(dataset_id) resources.add(dataset_resource) continue table_expression = split_path[5] print("table_expression: " + table_expression) if path_length != 6: print("Error. Invalid URI " + path) return None else: tag_type = constants.BQ_TABLE_TAG if table_expression == "*": #print("list tables in dataset") tables = list(self.bq_client.list_tables(self.bq_client.get_dataset(dataset_id))) for table in tables: #print("full_table_id: " + str(table.full_table_id)) table_resources.add(table.full_table_id) elif "*" in table_expression: #print("table expression contains wildcard") table_substrings = table_expression.split("*") tables = list(self.bq_client.list_tables(self.bq_client.get_dataset(dataset_id))) for table in tables: is_match = True for substring in table_substrings: if substring not in table.full_table_id: is_match = False break if is_match == True: table_resources.add(table.full_table_id) else: table_id = dataset_id + "." + table_expression try: table = self.bq_client.get_table(table_id) table_resources.add(table.full_table_id) except NotFound: print("Error: " + table_id + " not found.") if tag_type == constants.BQ_TABLE_TAG: for table in table_resources: formatted_table = self.format_table_resource(table) resources.add(formatted_table) return resources def find_gcs_resources(self, uris): resources = set() uris_list = uris.split(',') for uri in uris_list: # remove the 'gs://' prefix from the uri short_uri = uri[5:].strip() #print('short_uri: ' + short_uri) split_uri = short_uri.split('/') bucket_name = split_uri[0] #print('bucket_name: ' + bucket_name) # uri contains a folder # examples: discovery-area/cities_311/* or discovery-area/cities_311/austin_311_service_requests.parquet if len(split_uri) > 2: folder_start_index = len(bucket_name) + 1 #print('folder_start_index: ', folder_start_index) # uri points to a folder if short_uri.endswith('/*'): folder_end_index = short_uri.index('/*') folder = short_uri[folder_start_index:folder_end_index] #print('folder: ' + folder) for blob in self.gcs_client.list_blobs(bucket_name, prefix=folder): if blob.name == folder + '/' or blob.name.endswith('/'): continue resources.add((bucket_name, blob.name)) # uri points to a specific file # example: discovery-area/cities_311/austin_311_service_requests.parquet else: filename = short_uri[folder_start_index:] #print('filename: ' + filename) bucket = self.gcs_client.get_bucket(bucket_name) blob = bucket.blob(filename) if blob.exists(): resources.add((bucket_name, blob.name)) # uri does not contain a folder # examples: discovery-area/* or discovery-area/austin_311_service_requests.parquet elif len(split_uri) == 2: if short_uri.endswith('/*'): for blob in self.gcs_client.list_blobs(bucket_name): if blob.name.endswith('/'): continue #print('blob: ' + str(blob.name)) resources.add((bucket_name, blob.name)) else: file_index_start = short_uri.index('/') + 1 filename = short_uri[file_index_start:] #print('filename: ' + filename) bucket = self.gcs_client.get_bucket(bucket_name) blob = bucket.blob(filename) if blob.exists(): if blob.name.endswith('/') == False: resources.add((bucket_name, blob.name)) else: print('Error: invalid uri provided: ' + uri) return resources if __name__ == '__main__': credentials, _ = google.auth.default() res = Resources(credentials) #uris = res.get_resources('bigquery/project/tag-engine-run/dataset/GCP_Mockup/*', None) #uris = res.get_resources('gs://csv-meta-tags/sakila/sakila_column_tags.csv', None) uris = res.get_resources('gs://csv-meta-tags/sakila/*', None) #uris = get_resources_by_project(['record-manager-service']) #uris = get_resources_by_folder('folders/a593258468753') print(uris)