perfkitbenchmarker/providers/gcp/bigquery.py (533 lines of code) (raw):

# Copyright 2018 PerfKitBenchmarker Authors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Module containing class for GCP's Bigquery EDW service.""" import copy import datetime import json import logging import os import re from typing import Any from absl import flags from perfkitbenchmarker import data from perfkitbenchmarker import edw_service from perfkitbenchmarker import provider_info from perfkitbenchmarker import vm_util from perfkitbenchmarker.linux_packages import google_cloud_sdk from perfkitbenchmarker.providers.gcp import util as gcp_util FLAGS = flags.FLAGS BQ_CLIENT_FILE = 'bq-jdbc-simba-client-1.8-temp-labels.jar' BQ_PYTHON_CLIENT_FILE = 'bq_python_driver.py' BQ_PYTHON_CLIENT_DIR = 'edw/bigquery/clients/python' DEFAULT_TABLE_EXPIRATION = 3600 * 24 * 365 # seconds BQ_JDBC_INTERFACES = ['SIMBA_JDBC_1_6_3_1004', 'GOOGLE_JDBC'] BQ_JDBC_CLIENT_FILE = { 'SIMBA_JDBC_1_6_3_1004': 'bq-jdbc-client-2.4.jar', 'GOOGLE_JDBC': 'bq-google-jdbc-client-1.0.jar', } BQ_JDBC_JAR_FILE = { 'SIMBA_JDBC_1_6_3_1004': 'GoogleBigQueryJDBC42_1_6_3.jar', 'GOOGLE_JDBC': 'jdbc-jar-with-dependencies-20250129.jar', } class GenericClientInterface(edw_service.EdwClientInterface): """Generic Client Interface class for BigQuery. Attributes: project_id: String name of the BigQuery project to benchmark dataset_id: String name of the BigQuery dataset to benchmark """ def __init__(self, project_id: str, dataset_id: str): self.project_id = project_id self.dataset_id = dataset_id def GetMetadata(self) -> dict[str, str]: """Gets the Metadata attributes for the Client Interface.""" return {'client': FLAGS.bq_client_interface} def RunQueryWithResults(self, query_name: str) -> str: raise NotImplementedError def GetBigQueryClientInterface( project_id: str, dataset_id: str ) -> GenericClientInterface: """Builds and Returns the requested BigQuery client Interface. Args: project_id: String name of the BigQuery project to benchmark dataset_id: String name of the BigQuery dataset to benchmark Returns: A concrete Client Interface object (subclass of GenericClientInterface) Raises: RuntimeError: if an unsupported bq_client_interface is requested """ if FLAGS.bq_client_interface == 'CLI': return CliClientInterface(project_id, dataset_id) if FLAGS.bq_client_interface == 'JAVA': return JavaClientInterface(project_id, dataset_id) if FLAGS.bq_client_interface in BQ_JDBC_INTERFACES: return JdbcClientInterface(project_id, dataset_id) if FLAGS.bq_client_interface == 'PYTHON': return PythonClientInterface(project_id, dataset_id) raise RuntimeError('Unknown BigQuery Client Interface requested.') class CliClientInterface(GenericClientInterface): """Command Line Client Interface class for BigQuery. Uses the native Bigquery client that ships with the google_cloud_sdk https://cloud.google.com/bigquery/docs/bq-command-line-tool. """ def Prepare(self, package_name: str) -> None: """Prepares the client vm to execute query. Installs the bq tool dependencies and authenticates using a service account. Args: package_name: String name of the package defining the preprovisioned data (certificates, etc.) to extract and use during client vm preparation. """ self.client_vm.Install('pip') self.client_vm.RemoteCommand('sudo pip install absl-py') self.client_vm.Install('google_cloud_sdk') # Push the service account file to the working directory on client vm key_file_name = FLAGS.gcp_service_account_key_file.split('/')[-1] if '/' in FLAGS.gcp_service_account_key_file: self.client_vm.PushFile(FLAGS.gcp_service_account_key_file) else: self.client_vm.InstallPreprovisionedPackageData( package_name, [FLAGS.gcp_service_account_key_file], '' ) # Authenticate using the service account file vm_gcloud_path = google_cloud_sdk.GCLOUD_PATH activate_cmd = '{} auth activate-service-account {} --key-file={}'.format( vm_gcloud_path, FLAGS.gcp_service_account, key_file_name ) self.client_vm.RemoteCommand(activate_cmd) # Push the framework to execute a sql query and gather performance details service_specific_dir = os.path.join('edw', Bigquery.SERVICE_TYPE) self.client_vm.PushFile( data.ResourcePath( os.path.join(service_specific_dir, 'script_runner.sh') ) ) runner_permission_update_cmd = 'chmod 755 {}'.format('script_runner.sh') self.client_vm.RemoteCommand(runner_permission_update_cmd) self.client_vm.PushFile( data.ResourcePath(os.path.join('edw', 'script_driver.py')) ) self.client_vm.PushFile( data.ResourcePath( os.path.join( service_specific_dir, 'provider_specific_script_driver.py' ) ) ) def ExecuteQuery( self, query_name: str, print_results: bool = False ) -> tuple[float, dict[str, Any]]: """Executes a query and returns performance details. Args: query_name: String name of the query to execute print_results: Whether to include query results in execution details. Returns: A tuple of (execution_time, execution details) execution_time: A Float variable set to the query's completion time in secs. -1.0 is used as a sentinel value implying the query failed. For a successful query the value is expected to be positive. performance_details: A dictionary of query execution attributes eg. job_id """ query_command = ( 'python script_driver.py --script={} --bq_project_id={} ' '--bq_dataset_id={}' ).format(query_name, self.project_id, self.dataset_id) if print_results: query_command += ' --print_results=true' stdout, _ = self.client_vm.RemoteCommand(query_command) performance = json.loads(stdout) details = copy.copy(self.GetMetadata()) # Copy the base metadata details['job_id'] = performance[query_name]['job_id'] return float(performance[query_name]['execution_time']), details class JdbcClientInterface(GenericClientInterface): """JDBC Client Interface class for BigQuery. https://cloud.google.com/bigquery/providers/simba-drivers """ def SetProvisionedAttributes(self, benchmark_spec): super().SetProvisionedAttributes(benchmark_spec) self.project_id = re.split( r'\.', benchmark_spec.edw_service.cluster_identifier )[0] self.dataset_id = re.split( r'\.', benchmark_spec.edw_service.cluster_identifier )[1] def Prepare(self, package_name: str) -> None: """Prepares the client vm to execute query. Installs a) Java Execution Environment, b) BigQuery Authnetication Credentials, c) JDBC Application to execute a query and gather execution details, d) Simba JDBC BigQuery client code dependencencies, and e) The Simba JDBC interface jar Args: package_name: String name of the package defining the preprovisioned data (certificates, etc.) to extract and use during client vm preparation. """ self.client_vm.Install('openjdk') # Push the service account file to the working directory on client vm self.client_vm.InstallPreprovisionedPackageData( package_name, [FLAGS.gcp_service_account_key_file], '' ) # Push the executable jars to the working directory on client vm self.client_vm.InstallPreprovisionedPackageData( package_name, [ BQ_JDBC_CLIENT_FILE[FLAGS.bq_client_interface], BQ_JDBC_JAR_FILE[FLAGS.bq_client_interface], ], '', ) def ExecuteQuery( self, query_name: str, print_results: bool = False ) -> tuple[float, dict[str, Any]]: """Executes a query and returns performance details. Args: query_name: String name of the query to execute print_results: Whether to include query results in execution details. Returns: A tuple of (execution_time, execution details) execution_time: A Float variable set to the query's completion time in secs. -1.0 is used as a sentinel value implying the query failed. For a successful query the value is expected to be positive. performance_details: A dictionary of query execution attributes eg. job_id """ query_command = ( 'java -cp {}:{} ' 'com.google.cloud.performance.edw.App --project {} --service_account ' '{} --credentials_file {} --dataset {} --query_file {}'.format( BQ_JDBC_CLIENT_FILE[FLAGS.bq_client_interface], BQ_JDBC_JAR_FILE[FLAGS.bq_client_interface], self.project_id, FLAGS.gcp_service_account, FLAGS.gcp_service_account_key_file, self.dataset_id, query_name, ) ) if print_results: query_command += ' --print_results true' stdout, _ = self.client_vm.RemoteCommand(query_command) details = copy.copy(self.GetMetadata()) # Copy the base metadata details.update(json.loads(stdout)['details']) return json.loads(stdout)['performance'], details class JavaClientInterface(GenericClientInterface): """Native Java Client Interface class for BigQuery. https://cloud.google.com/bigquery/docs/reference/libraries#client-libraries-install-java """ def Prepare(self, package_name: str) -> None: """Prepares the client vm to execute query. Installs the Java Execution Environment and a uber jar with a) BigQuery Java client libraries, b) An application to execute a query and gather execution details, and c) their dependencies. Args: package_name: String name of the package defining the preprovisioned data (certificates, etc.) to extract and use during client vm preparation. """ self.client_vm.Install('openjdk') # Push the service account file to the working directory on client vm if '/' in FLAGS.gcp_service_account_key_file: self.client_vm.PushFile(FLAGS.gcp_service_account_key_file) else: self.client_vm.InstallPreprovisionedPackageData( package_name, [FLAGS.gcp_service_account_key_file], '' ) # Push the executable jar to the working directory on client vm self.client_vm.InstallPreprovisionedPackageData( package_name, [BQ_CLIENT_FILE], '' ) def ExecuteQuery( self, query_name: str, print_results: bool = False ) -> tuple[float, dict[str, Any]]: """Executes a query and returns performance details. Args: query_name: String name of the query to execute. print_results: Whether to include query results in execution details. Returns: A tuple of (execution_time, execution details) execution_time: A Float variable set to the query's completion time in secs. -1.0 is used as a sentinel value implying the query failed. For a successful query the value is expected to be positive. performance_details: A dictionary of query execution attributes eg. job_id """ key_file_name = FLAGS.gcp_service_account_key_file if '/' in FLAGS.gcp_service_account_key_file: key_file_name = FLAGS.gcp_service_account_key_file.split('/')[-1] query_command = ( 'java -Xmx6g -cp {} ' 'com.google.cloud.performance.edw.Single --project {} ' '--credentials_file {} --dataset {} ' '--query_file {}' ).format( BQ_CLIENT_FILE, self.project_id, key_file_name, self.dataset_id, query_name, ) if print_results: query_command += ' --print_results true' stdout, _ = self.client_vm.RemoteCommand(query_command) details = copy.copy(self.GetMetadata()) # Copy the base metadata details.update(json.loads(stdout)['details']) return json.loads(stdout)['query_wall_time_in_secs'], details def ExecuteSimultaneous( self, submission_interval: int, queries: list[str] ) -> str: """Executes queries simultaneously on client and return performance details. Simultaneous app expects queries as white space separated query file names. Args: submission_interval: Simultaneous query submission interval in milliseconds. queries: List of strings (names) of queries to execute. Returns: A serialized dictionary of execution details. """ key_file_name = FLAGS.gcp_service_account_key_file if '/' in FLAGS.gcp_service_account_key_file: key_file_name = os.path.basename(FLAGS.gcp_service_account_key_file) cmd = ( 'java -Xmx6g -cp {} ' 'com.google.cloud.performance.edw.Simultaneous --project {} ' '--credentials_file {} --dataset {} --submission_interval {} ' '--query_files {}'.format( BQ_CLIENT_FILE, self.project_id, key_file_name, self.dataset_id, submission_interval, ' '.join(queries), ) ) stdout, _ = self.client_vm.RemoteCommand(cmd) return stdout def ExecuteThroughput( self, concurrency_streams: list[list[str]], labels: dict[str, str] | None = None, ) -> str: key_file_name = FLAGS.gcp_service_account_key_file if '/' in FLAGS.gcp_service_account_key_file: key_file_name = os.path.basename(FLAGS.gcp_service_account_key_file) runlabels = '' if labels: for key, value in labels.items(): runlabels += f' --label {key}={value}' cmd = ( 'java -Xmx6g -cp {} ' 'com.google.cloud.performance.edw.Throughput --project {} ' '--credentials_file {} --dataset {} --query_streams {}'.format( BQ_CLIENT_FILE, self.project_id, key_file_name, self.dataset_id, ' '.join([','.join(stream) for stream in concurrency_streams]), ) + runlabels ) stdout, _ = self.client_vm.RemoteCommand(cmd) return stdout class PythonClientInterface(GenericClientInterface): """Python Client Interface class for BigQuery.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.destination: str | None = None self.key_file_name = FLAGS.gcp_service_account_key_file if '/' in FLAGS.gcp_service_account_key_file: self.key_file_name = os.path.basename(FLAGS.gcp_service_account_key_file) def Prepare(self, package_name: str) -> None: """Prepares the client vm to execute query.""" # Push the service account file to the working directory on client vm if '/' in FLAGS.gcp_service_account_key_file: self.client_vm.PushFile(FLAGS.gcp_service_account_key_file) else: self.client_vm.InstallPreprovisionedPackageData( package_name, [FLAGS.gcp_service_account_key_file], '' ) # Install dependencies for driver self.client_vm.Install('pip') self.client_vm.RemoteCommand( 'sudo apt-get -qq update && DEBIAN_FRONTEND=noninteractive sudo apt-get' ' -qq install python3.12-venv' ) self.client_vm.RemoteCommand('python3 -m venv .venv') self.client_vm.RemoteCommand( 'source .venv/bin/activate && pip install google-cloud-bigquery' ' google-cloud-bigquery-storage pyarrow' ) # Push driver script to client vm self.client_vm.PushDataFile( os.path.join(BQ_PYTHON_CLIENT_DIR, BQ_PYTHON_CLIENT_FILE) ) def ExecuteQuery( self, query_name: str, print_results: bool = False ) -> tuple[float, dict[str, Any]]: """Executes a query and returns performance details.""" cmd = ( f'.venv/bin/python {BQ_PYTHON_CLIENT_FILE} single --project' f' {self.project_id} --credentials_file {self.key_file_name} --dataset' f' {self.dataset_id} --query_file {query_name} --feature_config' f' {FLAGS.edw_bq_feature_config}' ) if print_results: cmd += ' --print_results' if self.destination: cmd += f' --destination {self.destination}' stdout, _ = self.client_vm.RemoteCommand(cmd) details = copy.copy(self.GetMetadata()) details.update(json.loads(stdout)['details']) return json.loads(stdout)['query_wall_time_in_secs'], details def ExecuteThroughput( self, concurrency_streams: list[list[str]], labels: dict[str, str] | None = None, ) -> str: """Executes queries simultaneously on client and return performance details.""" cmd = ( f'.venv/bin/python {BQ_PYTHON_CLIENT_FILE} throughput --project' f' {self.project_id} --credentials_file {self.key_file_name} --dataset' f" {self.dataset_id} --query_streams='{json.dumps(concurrency_streams)}'" f' --feature_config {FLAGS.edw_bq_feature_config} --labels' f" '{json.dumps(labels)}'" ) stdout, _ = self.client_vm.RemoteCommand(cmd) return stdout def RunQueryWithResults(self, query_name: str) -> str: """Executes a query and returns performance details and query output.""" cmd = ( f'.venv/bin/python {BQ_PYTHON_CLIENT_FILE} single --project' f' {self.project_id} --credentials_file {self.key_file_name} --dataset' f' {self.dataset_id} --query_file {query_name} --print_results' ) stdout, _ = self.client_vm.RemoteCommand(cmd) return stdout class Bigquery(edw_service.EdwService): """Object representing a Bigquery cluster. Attributes: job_id_prefix: A string prefix for the job id for bigquery job. """ CLOUD = provider_info.GCP SERVICE_TYPE = 'bigquery' RUN_COST_QUERY_TEMPLATE = 'edw/bigquery/run_cost_query.sql.j2' client_interface: GenericClientInterface def __init__(self, edw_service_spec): super().__init__(edw_service_spec) project_id = re.split(r'\.', self.cluster_identifier)[0] dataset_id = re.split(r'\.', self.cluster_identifier)[1] self.client_interface = GetBigQueryClientInterface(project_id, dataset_id) def _Create(self): """Create a BigQuery cluster. Bigquery clusters creation is out of scope of the benchmarking. """ raise NotImplementedError def _Exists(self): """Method to validate the existence of a Bigquery cluster. Returns: Boolean value indicating the existence of a cluster. """ return True def _Delete(self): """Delete a BigQuery cluster. Bigquery cluster deletion is out of scope of benchmarking. """ raise NotImplementedError def GetMetadata(self): """Return a dictionary of the metadata for the BigQuery cluster.""" basic_data = super().GetMetadata() basic_data.update(self.client_interface.GetMetadata()) return basic_data def FormatProjectAndDatasetForCommand(self, dataset=None): """Returns the project and dataset in the format needed for bq commands. E.g., project:dataset. Args: dataset: The dataset to run commands against. If None, extracts the dataset from the cluster identifier whose format is "project.dataset"). """ return ( (self.cluster_identifier.split('.')[0] + ':' + dataset) if dataset else self.cluster_identifier.replace('.', ':') ) def GetDatasetLastUpdatedTime(self, dataset=None): """Get the formatted last modified timestamp of the dataset.""" cmd = [ 'bq', 'show', '--format=prettyjson', self.FormatProjectAndDatasetForCommand(dataset), ] dataset_metadata, _, _ = vm_util.IssueCommand(cmd) metadata_json = json.loads(str(dataset_metadata)) return datetime.datetime.fromtimestamp( float(metadata_json['lastModifiedTime']) / 1000.0 ).strftime('%Y-%m-%d_%H-%M-%S') def GetAllTablesInDataset(self, dataset=None): """Returns a list of the IDs of all the tables in the dataset.""" cmd = [ 'bq', 'ls', '--format=prettyjson', self.FormatProjectAndDatasetForCommand(dataset), ] tables_list, _, _ = vm_util.IssueCommand(cmd) all_tables = [] for table in json.loads(str(tables_list)): if table['type'] == 'TABLE': all_tables.append(table['tableReference']['tableId']) return all_tables def ExtractDataset( self, dest_bucket, dataset=None, tables=None, dest_format='CSV' ): """Extract all tables in a dataset to a GCS bucket. Args: dest_bucket: Name of the bucket to extract the data to. Should already exist. dataset: Optional name of the dataset. If none, will be extracted from the cluster_identifier. tables: Optional list of table names to extract. If none, all tables in the dataset will be extracted. dest_format: Format to extract data in. Can be one of: CSV, JSON, or Avro. """ if tables is None: tables = self.GetAllTablesInDataset(dataset) gcs_uri = 'gs://' + dest_bucket # Make sure the bucket is empty. vm_util.IssueCommand( ['gsutil', '-m', 'rm', gcs_uri + '/**'], raise_on_failure=False ) project_dataset = self.FormatProjectAndDatasetForCommand(dataset) for table in tables: cmd = [ 'bq', 'extract', '--destination_format=%s' % dest_format, '%s.%s' % (project_dataset, table), '%s/%s/*.csv' % (gcs_uri, table), ] _, stderr, retcode = vm_util.IssueCommand(cmd) # There is a 10T daily limit on extracting from BQ. Large datasets will # inherently hit this limit and benchmarks shouldn't use those. gcp_util.CheckGcloudResponseKnownFailures(stderr, retcode) def RemoveDataset(self, dataset=None): """Removes a dataset. See https://cloud.google.com/bigquery/docs/managing-tables#deleting_tables Args: dataset: Optional name of the dataset. If none, will be extracted from the cluster_identifier. """ project_dataset = self.FormatProjectAndDatasetForCommand(dataset) vm_util.IssueCommand( ['bq', 'rm', '-r', '-f', '-d', project_dataset], raise_on_failure=False ) def CreateDataset(self, dataset=None, description=None): """Creates a new dataset. See https://cloud.google.com/bigquery/docs/tables Args: dataset: Optional name of the dataset. If none, will be extracted from the cluster_identifier. description: Optional description of the dataset. Escape double quotes. """ project_dataset = self.FormatProjectAndDatasetForCommand(dataset) cmd = [ 'bq', 'mk', '--dataset', '--default_table_expiration=%d' % DEFAULT_TABLE_EXPIRATION, ] if description: cmd.extend(['--description', '"%s"' % description]) cmd.append(project_dataset) vm_util.IssueCommand(cmd) cmd = ['bq', 'update'] for key, value in gcp_util.GetDefaultTags().items(): cmd.extend(['--set_label', f'{key}:{value}']) cmd.append(project_dataset) vm_util.IssueCommand(cmd) def LoadDataset( self, # pytype: disable=signature-mismatch # overriding-parameter-count-checks source_bucket, tables, schema_dir, dataset=None, append=True, skip_header_row=True, field_delimiter=',', ): """Load all tables in a dataset to a database from CSV object storage. See https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv Args: source_bucket: Name of the bucket to load the data from. Should already exist. Each table must have its own subfolder in the bucket named after the table, containing one or more csv files that make up the table data. tables: List of table names to load. schema_dir: GCS directory containing json schemas of all tables to load. dataset: Optional name of the dataset. If none, will be extracted from the cluster_identifier. append: If True, appends loaded data to the existing set. If False, replaces the existing data (if any). skip_header_row: If True, skips the first row of data being loaded. field_delimiter: The separator for fields in the CSV file. """ project_dataset = self.FormatProjectAndDatasetForCommand(dataset) for table in tables: schema_path = schema_dir + table + '.json' local_schema = './%s.json' % table vm_util.IssueCommand(['gsutil', 'cp', schema_path, local_schema]) cmd = [ 'bq', 'load', '--noreplace' if append else '--replace', '--source_format=CSV', '--field_delimiter=%s' % field_delimiter, '--skip_leading_rows=%d' % (1 if skip_header_row else 0), '%s.%s' % (project_dataset, table), 'gs://%s/%s/*.csv' % (source_bucket, table), local_schema, ] _, stderr, retcode = vm_util.IssueCommand(cmd, raise_on_failure=False) if retcode: logging.warning( 'Loading table %s failed. stderr: %s, retcode: %s', table, stderr, retcode, ) cmd = ['bq', 'update'] for key, value in gcp_util.GetDefaultTags().items(): cmd.extend(['--set_label', f'{key}:{value}']) cmd.append(f'{project_dataset}.{table}') vm_util.IssueCommand(cmd) def GetDatasetRegion(self, dataset=None): """Get the region that a dataset resides in.""" cmd = [ 'bq', 'show', '--format=prettyjson', self.FormatProjectAndDatasetForCommand(dataset), ] dataset_metadata, _, _ = vm_util.IssueCommand(cmd) metadata_json = json.loads(str(dataset_metadata)) return str(metadata_json['location']).lower() def OpenDataset(self, dataset: str): self.client_interface.dataset_id = dataset def CopyTable(self, copy_table_name: str, to_dataset: str) -> None: source = f'{self.FormatProjectAndDatasetForCommand()}.{copy_table_name}' dest = f'{self.FormatProjectAndDatasetForCommand(to_dataset)}.{copy_table_name}' cmd = ['bq', 'cp', source, dest] vm_util.IssueCommand(cmd) def GetAutoscaleSlotSeconds(self, run_iter_id: str) -> int: query_file_name = f'cost_query_{run_iter_id}' context = { 'run_identifier': run_iter_id, 'project_dot_region': ( f'{self.client_interface.project_id}.region-{self.GetDatasetRegion()}' ), } self.client_interface.client_vm.RenderTemplate( data.ResourcePath(self.RUN_COST_QUERY_TEMPLATE), query_file_name, context, ) output = json.loads( self.client_interface.RunQueryWithResults(query_file_name) ) run_cost = output['details']['query_results']['billed_slot_seconds'][0] return run_cost def GetIterationAuxiliaryMetrics(self, iter_run_key: str) -> dict[str, Any]: service_auxiliary_metrics = {} try: run_cost = self.GetAutoscaleSlotSeconds(iter_run_key) service_auxiliary_metrics['edw_bq_autoscale_slot_seconds'] = { 'value': run_cost, 'unit': 'slot-seconds', } return service_auxiliary_metrics except NotImplementedError: # No metrics support in client interface. return {} class Endor(Bigquery): """Class representing BigQuery Endor service.""" SERVICE_TYPE = 'endor' def GetMetadata(self) -> dict[str, str]: """Return a dictionary of the metadata for the BigQuery Endor service. Returns: A dictionary set to Endor service details. """ basic_data = super().GetMetadata() basic_data['edw_service_type'] = 'endor' basic_data.update(self.client_interface.GetMetadata()) basic_data.update(self.GetDataDetails()) return basic_data def GetDataDetails(self) -> dict[str, str]: """Returns a dictionary with underlying data details. cluster_identifier = <project_id>.<dataset_id> Data details are extracted from the dataset_id that follows the format: <dataset>_<format>_<compression>_<partitioning>_<location> eg. tpch100_parquet_uncompressed_unpartitoned_s3 Returns: A dictionary set to underlying data's details (format, etc.) """ data_details = {} dataset_id = re.split(r'\.', self.cluster_identifier)[1] parsed_id = re.split(r'_', dataset_id) data_details['format'] = parsed_id[1] data_details['compression'] = parsed_id[2] data_details['partitioning'] = parsed_id[3] data_details['location'] = parsed_id[4] return data_details class Endorazure(Endor): """Class representing BigQuery Endor Azure service.""" SERVICE_TYPE = 'endorazure' def GetMetadata(self) -> dict[str, str]: """Return a dictionary of the metadata for the BigQuery Endor Azure service. Returns: A dictionary set to Endor Azure service details. """ basic_data = super().GetMetadata() basic_data['edw_service_type'] = 'endorazure' return basic_data class Bqfederated(Bigquery): """Class representing BigQuery Federated service.""" SERVICE_TYPE = 'bqfederated' def GetMetadata(self) -> dict[str, str]: """Return a dictionary of the metadata for the BigQuery Federated service. Returns: A dictionary set to Federated service details. """ basic_data = super().GetMetadata() basic_data['edw_service_type'] = self.SERVICE_TYPE basic_data.update(self.client_interface.GetMetadata()) basic_data.update(self.GetDataDetails()) return basic_data def GetDataDetails(self) -> dict[str, str]: """Returns a dictionary with underlying data details. cluster_identifier = <project_id>.<dataset_id> Data details are extracted from the dataset_id which follows one of these three formats: <dataset>_<format>_<compression>_<partitioning>_<location> or <dataset>_<format>_<compression>_<partitioning>_<storage>_<location> or <dataset>_<format>_<table_format>_<compression>_<partitioning>_<storage>_<location> E.g: tpcds1000_parquet_compressed_partitioned_gcs tpcds1000_parquet_snappy_part_gcs_uscentral1 tpcds1000_parquet_iceberg_snappy_part_gcs_us Returns: A dictionary set to underlying data's details (format, etc.) """ data_details = {} project_id, dataset_id = re.split(r'\.', self.cluster_identifier) data_details['metadata_caching'] = str('metadata-caching' in project_id) parsed_id = re.split(r'_', dataset_id) if len(parsed_id) == 5: data_details['format'] = parsed_id[1] data_details['table_format'] = 'None' data_details['compression'] = parsed_id[2] data_details['partitioning'] = parsed_id[3] data_details['storage'] = parsed_id[4] data_details['location'] = 'us' elif len(parsed_id) == 6: data_details['format'] = parsed_id[1] data_details['table_format'] = 'None' data_details['compression'] = parsed_id[2] data_details['partitioning'] = parsed_id[3] data_details['storage'] = parsed_id[4] data_details['location'] = parsed_id[5] elif len(parsed_id) == 7: data_details['format'] = parsed_id[1] data_details['table_format'] = parsed_id[2] data_details['compression'] = parsed_id[3] data_details['partitioning'] = parsed_id[4] data_details['storage'] = parsed_id[5] data_details['location'] = parsed_id[6] else: data_details['format'] = 'unknown' data_details['table_format'] = 'unknown' data_details['compression'] = 'unknown' data_details['partitioning'] = 'unknown' data_details['storage'] = 'unknown' data_details['location'] = 'unknown' return data_details