analytics-hub/snippets/create_listing_python/main.py (260 lines of code) (raw):

# -*- coding: utf-8 -*- # Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from string import Template from google.cloud import bigquery_analyticshub_v1 from google.cloud import bigquery from google.iam.v1 import iam_policy_pb2 from http import HTTPStatus from google.protobuf.json_format import MessageToDict import base64 import time import argparse def get_or_create_exchange(client: bigquery_analyticshub_v1.AnalyticsHubServiceClient, project_id: str, location: str, exchange_id: str, is_dcr: bool): # Initialize request argument(s) request = bigquery_analyticshub_v1.GetDataExchangeRequest( name=f"projects/{project_id}/locations/{location}/dataExchanges/{exchange_id}", ) # Make the request try: response = client.get_data_exchange(request=request) # Handle the response print(response) return response except Exception as ex: if ex.code == HTTPStatus.NOT_FOUND: print("Not found, creating") # DataCleanRoom shared_environment_config = bigquery_analyticshub_v1.SharingEnvironmentConfig() if is_dcr: shared_environment_config.dcr_exchange_config = bigquery_analyticshub_v1.SharingEnvironmentConfig.DcrExchangeConfig() exTitleTag = "Data Clean Room" else: shared_environment_config.default_exchange_config = bigquery_analyticshub_v1.SharingEnvironmentConfig.DefaultExchangeConfig() exTitleTag = "Data Exchange" # Initialize request argument(s) data_exchange = bigquery_analyticshub_v1.DataExchange() data_exchange.display_name = f"Example {exTitleTag} - created using python API" data_exchange.description = f"Example {exTitleTag} - created using python API" data_exchange.primary_contact = "" data_exchange.documentation = "https://link.to.optional.documentation/" data_exchange.sharing_environment_config = shared_environment_config request = bigquery_analyticshub_v1.CreateDataExchangeRequest( parent=f"projects/{project_id}/locations/{location}", data_exchange_id=exchange_id, data_exchange=data_exchange, ) try: # Make the request response = client.create_data_exchange(request=request) # Handle the response print(response) return response except Exception as ex: print(ex) else: print(ex) return False def get_or_create_listing(client: bigquery_analyticshub_v1.AnalyticsHubServiceClient, project_id: str, location: str, exchange_id: str, listing_id: str, restrict_egress: bool, shared_ds: str): # Initialize request argument(s) request = bigquery_analyticshub_v1.GetListingRequest( name=f"projects/{project_id}/locations/{location}/dataExchanges/{exchange_id}/listings/{listing_id}", ) # Make the request try: response = client.get_listing(request=request) # Handle the response print(response) return response except Exception as ex: if ex.code == HTTPStatus.NOT_FOUND: print("Not found, creating") # Initialize request argument(s) listing = bigquery_analyticshub_v1.Listing() listing.display_name = "Example Exchange Listing - created using python API" listing.description = "Example Exchange Listing - created using python API" listing.data_provider = bigquery_analyticshub_v1.DataProvider() listing.data_provider.name = "Example Exchange Listing - created using python API" listing.data_provider.primary_contact = "primary@contact.co" listing.bigquery_dataset = bigquery_analyticshub_v1.Listing.BigQueryDatasetSource() listing.bigquery_dataset.dataset = f"projects/{project_id}/datasets/{shared_ds}" listing.restricted_export_config = bigquery_analyticshub_v1.Listing.RestrictedExportConfig() if restrict_egress: listing.restricted_export_config.enabled = True listing.restricted_export_config.restrict_direct_table_access = True listing.restricted_export_config.restrict_query_result = True else: listing.restricted_export_config.enabled = False listing.restricted_export_config.restrict_direct_table_access = False listing.restricted_export_config.restrict_query_result = False request = bigquery_analyticshub_v1.CreateListingRequest( parent=f"projects/{project_id}/locations/{location}/dataExchanges/{exchange_id}", listing_id=listing_id, listing=listing, ) try: # Make the request response = client.create_listing(request=request) # Handle the response print(response) return response except Exception as ex: print(ex) else: print(ex) return False def get_or_create_dcr_listing(client: bigquery_analyticshub_v1.AnalyticsHubServiceClient, project_id: str, location: str, exchange_id: str, listing_id: str, shared_ds: str, source_view: str): # Initialize request argument(s) request = bigquery_analyticshub_v1.GetListingRequest( name=f"projects/{project_id}/locations/{location}/dataExchanges/{exchange_id}/listings/{listing_id}", ) # Make the request try: response = client.get_listing(request=request) # Handle the response print(response) return response except Exception as ex: if ex.code == HTTPStatus.NOT_FOUND: print("Not found, creating") # Initialize request argument(s) listing = bigquery_analyticshub_v1.Listing() listing.display_name = source_view listing.primary_contact = "primary@contact.co" listing.bigquery_dataset = bigquery_analyticshub_v1.Listing.BigQueryDatasetSource() listing.bigquery_dataset.dataset = f"projects/{project_id}/datasets/{shared_ds}" listing_ds_selected_resource = bigquery_analyticshub_v1.Listing.BigQueryDatasetSource.SelectedResource() listing_ds_selected_resource.table = f"projects/{project_id}/datasets/{shared_ds}/tables/{source_view}" listing.bigquery_dataset.selected_resources = [ listing_ds_selected_resource ] listing.restricted_export_config = bigquery_analyticshub_v1.Listing.RestrictedExportConfig() listing.restricted_export_config.enabled = True listing.restricted_export_config.restrict_direct_table_access = True listing.restricted_export_config.restrict_query_result = False request = bigquery_analyticshub_v1.CreateListingRequest( parent=f"projects/{project_id}/locations/{location}/dataExchanges/{exchange_id}", listing_id=listing_id, listing=listing, ) try: # Make the request response = client.create_listing(request=request) # Handle the response print(response) return response except Exception as ex: print(ex) else: print(ex) return False def create_set_iam_policy_request(client: bigquery_analyticshub_v1.AnalyticsHubServiceClient, listing_id: str, role: str, member: str): existingPolicy = listing_get_iam_policy(client, listing_id) existingPolicyDict = MessageToDict(existingPolicy) if existingPolicyDict: policy = { "etag": base64.b64decode(existingPolicyDict['etag']), "bindings": [] } bindingForRoleFound = False if 'bindings' in existingPolicyDict: policy['bindings'] = existingPolicyDict['bindings'].copy() for binding in policy['bindings']: if binding['role'] == role: binding['members'].append(member) bindingForRoleFound = True if not bindingForRoleFound: policy['bindings'].append({ "role": role, "members": [ member ] }) request = iam_policy_pb2.SetIamPolicyRequest( resource=listing_id, policy=policy ) return request return False def listing_add_iam_policy_member(client: bigquery_analyticshub_v1.AnalyticsHubServiceClient, listing_id: str, role: str, member: str): newPolicy = False exitLoop = False while not exitLoop: exitLoop = True try: # Make the request request = create_set_iam_policy_request(client, listing_id, role, member) if request: # Make the request response = client.set_iam_policy(request=request) # Handle the response newPolicy = response except Exception as ex: # CONFLICT == concurrent modification / Etag mismatch if ex.code == HTTPStatus.CONFLICT: print("listing_add_iam_policy_member: concurrent modification (Etag mismatch), retrying") time.sleep(60) exitLoop = False # TODO: handle UserNotFound error (e.g. the user to be added does not exist) else: print(ex) return newPolicy def listing_get_iam_policy(client: bigquery_analyticshub_v1.AnalyticsHubServiceClient, listing_id: str): request = iam_policy_pb2.GetIamPolicyRequest( resource=listing_id, ) try: # Make the request response = client.get_iam_policy(request=request) # Handle the response return response except Exception as ex: print(ex) return False def bq_view_prep_ddl(project_id: str, dataset_id: str, source_table_id: str, dst_table_id: str, privacy_unit_column: str): """Generates the BigQuery DDL statement for creating a view with privacy policy.""" create_view_ddl_template = Template( """CREATE OR REPLACE VIEW $project_id.$dataset_id.$dst_table_id OPTIONS( privacy_policy= '{"aggregation_threshold_policy": {"threshold": 3, "privacy_unit_column": "$privacy_unit_column"}}' ) AS ( SELECT * FROM $project_id.$dataset_id.$source_table_id );""" ) ddl_statement = create_view_ddl_template.substitute( project_id=project_id, dataset_id=dataset_id, source_table_id=source_table_id, dst_table_id=dst_table_id, privacy_unit_column=privacy_unit_column, ) return ddl_statement def create_bq_view_with_analysis_rules(client: bigquery.Client, project_id: str, dataset_id: str, source_table_id: str, dst_table_id: str): """Creates a BigQuery view using the generated DDL statement.""" ddl_statement = bq_view_prep_ddl(project_id, dataset_id, source_table_id, dst_table_id, "test") query_job = client.query(ddl_statement) # Wait for the query job to complete and check for errors. query_job.result() if query_job.error_result: print(query_job.error_result) return False return True def get_bq_table_metadata(client: bigquery.Client, dataset_id: str, table_id: str): """Retrieves metadata for a BigQuery table.""" table_ref = client.dataset(dataset_id).table(table_id) try: table_metadata = client.get_table(table_ref) return table_metadata, None # No error except Exception as e: return None, e # Return error def parse_commandline_args(): """Parses command-line arguments and returns them as a dictionary.""" parser = argparse.ArgumentParser(description="Command-line parameter parser") # Required arguments parser.add_argument("--project_id", help="Google Cloud project ID", required=True) parser.add_argument("--location", help="Location for the BigQuery dataset", required=True) parser.add_argument("--exchange_id", help="Exchange ID", required=True) parser.add_argument("--listing_id", help="Listing ID", required=True) parser.add_argument("--restrict_egress", help="Restrict egress", action='store_true', required=True) parser.add_argument("--shared_ds", help="Shared dataset ID", required=True) parser.add_argument("--dcr_shared_table", help="Table to share in Data Clean Room", required=True) parser.add_argument("--dcr_privacy_column", help="Privacy column for Data Clean Room", required=True) parser.add_argument("--dcr_view", help="View with analysis rules to create for Data Clean Room", required=True) parser.add_argument("--subscriber_iam_member", help="IAM member who can subscribe - requires either user: or serviceAccount: prefix", required=True) parser.add_argument("--subscription_viewer_iam_member", help="IAM member who can see subscription and request access - requires either user: or serviceAccount: prefix", required=True) args = parser.parse_args() # Convert parsed arguments to a dictionary return vars(args) if __name__ == "__main__": arguments = parse_commandline_args() arguments['dcr_exchange_id'] = f"{arguments['exchange_id']}_dcr" arguments['dcr_listing_id'] = f"{arguments['listing_id']}_dcr" print(f"Parsed arguments: {arguments}") # For demonstration clientAH = bigquery_analyticshub_v1.AnalyticsHubServiceClient() clientBQ = bigquery.Client() print("Creating Data Exchange") exchg = get_or_create_exchange(clientAH, arguments["project_id"], arguments["location"], arguments["exchange_id"], False) if exchg: print(exchg) listing = get_or_create_listing(clientAH, arguments["project_id"], arguments["location"], arguments["exchange_id"], arguments["listing_id"], arguments["restrict_egress"], arguments["shared_ds"]) if listing: print(listing) policy = listing_get_iam_policy(clientAH, listing.name) print("IAMPolicy.before") print(policy) policy = listing_add_iam_policy_member(clientAH, listing.name, "roles/analyticshub.subscriber", arguments["subscriber_iam_member"]) print("IAMPolicy.returned") print(policy) policy = listing_add_iam_policy_member(clientAH, listing.name, "roles/analyticshub.viewer", arguments["subscription_viewer_iam_member"]) print("IAMPolicy.returned") print(policy) print("IAMPolicy.after") policy = listing_get_iam_policy(clientAH, listing.name) print(policy) print("\nCreating Data Clean Room") exchgId = f'{arguments["exchange_id"]}_dcr' listingId = f'{arguments["listing_id"]}_dcr' exchg = get_or_create_exchange(clientAH, arguments["project_id"], arguments["location"], exchgId, True) if exchg: print(exchg) if create_bq_view_with_analysis_rules(clientBQ, arguments["project_id"], arguments["shared_ds"], arguments["dcr_shared_table"], arguments["dcr_view"]): (tmd, err) = get_bq_table_metadata(clientBQ, arguments["shared_ds"], arguments["dcr_view"]) print(tmd) if tmd is not None: listing = get_or_create_dcr_listing(clientAH, arguments["project_id"], arguments["location"], exchgId, listingId, arguments["shared_ds"], arguments["dcr_view"]) if listing: print(listing)