python/lookerstudio/lookerstudio_deployment.py (78 lines of code) (raw):

# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # README # Script loads the configuration from the config.ini and generates a clickable link using the Looker Studio Linking API. # Linking API Reference: https://developers.google.com/looker-studio/integrate/linking-api # This script assumes the default configuration of the views used by the Marketing Analytics Dashboard. # You are required to update the config.ini file with your own project and datasets in the [COMMON] section. import sys from google.cloud import bigquery from google.api_core.exceptions import Forbidden, NotFound, BadRequest from google.auth.exceptions import GoogleAuthError from configparser import ConfigParser, ExtendedInterpolation, Error as ConfigError # Constants CONFIG_FILE = "python/lookerstudio/config.ini" BASE_URL = "https://lookerstudio.google.com/reporting/create?" REPORT_ID = "f61f65fe-4991-45fc-bcdc-80593966f28c" REPORT_NAME = "Marketing%20Analytics%20Sample" def parse_config(config_file:str) -> list: """ Parses config file and returns a list of dicts for the data sources. """ try: # ExtendedInterpolation to parse ${} format in config file config = ConfigParser(interpolation=ExtendedInterpolation()) # optionxform to preserve case config.optionxform = str config.read(config_file) sections = config.sections() sources = list() for section in sections: # Do not add COMMON section to collection of options if section == "COMMON": continue # Convert options for the section to dict options = dict(config.items(section)) # Format section name for URL and add to options dict section = section.replace(' ','%20') options.update(dict(datasourceName=section)) sources.append(options) return sources # Handle exception from ConfigParser and exit program with error except ConfigError as error: print(f'Error parsing file {CONFIG_FILE}.\n{error}') sys.exit(1) def check_view_exists(resource_id:str) -> bool: """ Opens connection to BigQuery, \cCheck if view exists and account has access. Accepts a string Resource ID as a parameter. Expects Resource ID to be formatted. `ProjectId.DatasetId.TableId` Returns True if view exists and is accessible. """ view_exists = True try: bq_client.get_table(resource_id) # Catch exception if path to view does not exist and sets view_exists to false for further processing. except NotFound: print(f"ERROR: `{resource_id}` not found. Ensure the path is correct and the view has been created.") view_exists = False # Catch exception if path to view exists but user does not have access and sets view_exists to false for further processing. except Forbidden: print(f"ERROR: Access denied on `{resource_id}`. Ensure your account has access.") view_exists = False except BadRequest: print(f"ERROR: Project in `{resource_id}` does not exist.") view_exists = False return view_exists def add_data_source(data_source: dict) -> str: """ Formats data source dictionary as URL and returns formatted URL. Expects a dict parameter for data sources. """ resultUrl = str() # Get data source alias from first dict item ds_alias = data_source["ds_alias"] # Construct url from data_source and append for key,value in data_source.items(): # Exclude ds_alias from key/value URL generation if key != "ds_alias": resultUrl += f"&ds.{ds_alias}.{key}={value}" return resultUrl def main(): views_exist = True report_url = BASE_URL + 'c.reportId=' + REPORT_ID + '&c.explain=true' + '&r.reportName=' + REPORT_NAME # Get sources from config file sources = parse_config(CONFIG_FILE) for source in sources: # Get fully qualified view Resource ID and check it exists view_id = f"{source['projectId']}.{source['datasetId']}.{source['tableId']}" # If view doesn't exist, skip constructing URL and check next view if not check_view_exists(view_id): views_exist = False continue # Check views_exist, if one does not then don't bother generating urls for remaining sources # But continue through loop to continue checking if they are accessible if views_exist: # View exists so construct url and append to main report_url report_url += add_data_source(source) # If all views do not exist and are not accessible, print error and exit main() if not views_exist: print('\nAn error ocurred. See error(s) above and try again.') sys.exit(1) print('Click link to copy report to your account and finish configuring data sources:\n') print(report_url) # Guard check to ensure main is entry point if __name__ == '__main__': try: bq_client = None #Establish BQ connection and run main() bq_client = bigquery.Client() main() # Catch exception for missing project and exit except EnvironmentError: print('Project not defined. Set your project using command: \n\tgcloud config set project.') sys.exit(1) # Catch auth error, print message and exit if connection fails except GoogleAuthError as error: print(f"ERROR: {error}") sys.exit(1) finally: # Regardless of outcome, close BQ connection to clean up if it exists if bq_client is not None: bq_client.close()