transcoder/output/google_cloud/BigQueryOutputManager.py (100 lines of code) (raw):
#
# Copyright 2022 Google LLC
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
from google.cloud import bigquery
from google.cloud.exceptions import NotFound, Conflict
from transcoder.message import DatacastField, DatacastSchema
from transcoder.output import OutputManager
from transcoder.output.exception import BigQueryTableSchemaOutOfSyncError
from transcoder.output.google_cloud.Constants import GOOGLE_PACKAGED_SOLUTION_KEY, GOOGLE_PACKAGED_SOLUTION_LABEL_DICT, \
GOOGLE_PACKAGED_SOLUTION_VALUE
class BigQueryOutputManager(OutputManager):
"""Manages creation of BigQuery dataset and table objects"""
@staticmethod
def output_type_identifier():
return 'bigquery'
def __init__(self, project_id: str, dataset_id, output_prefix: str = None, lazy_create_resources: bool = False):
super().__init__(lazy_create_resources=lazy_create_resources)
self.project_id = project_id
self.dataset_id = dataset_id
self.dataset_ref = bigquery.DatasetReference(project_id, dataset_id)
self.output_prefix = output_prefix
self.client = bigquery.Client(project=project_id)
if self._does_dataset_exist(self.dataset_ref) is False:
self._create_dataset(self.dataset_ref)
else:
dataset = self.client.get_dataset(self.dataset_ref)
if GOOGLE_PACKAGED_SOLUTION_KEY not in dataset.labels or \
dataset.labels.get(GOOGLE_PACKAGED_SOLUTION_KEY, None) != GOOGLE_PACKAGED_SOLUTION_VALUE:
dataset.labels.update(GOOGLE_PACKAGED_SOLUTION_LABEL_DICT)
self.client.update_dataset(dataset, ["labels"])
self.tables = list(self.client.list_tables(dataset_id))
for table_id in list(map(lambda x: x.table_id, self.tables)):
self.existing_schemas.update({table_id: True})
def _create_field(self, field: DatacastField):
return field.create_bigquery_field()
def _does_table_exist(self, name):
table_ref = bigquery.TableReference(self.dataset_ref, name)
try:
self.client.get_table(table_ref)
logging.debug('Table %s already exists.', table_ref.table_id)
return True
except NotFound:
logging.debug('Table %s is not found.', table_ref.table_id)
return False
@staticmethod
def _is_schema_equal(schema_1, schema_2):
if len(schema_1) != len(schema_2):
logging.debug('Schema list length difference')
return False
field_count = range(len(schema_1))
for i in field_count:
f_1: bigquery.SchemaField = schema_1[i]
f_2: bigquery.SchemaField = schema_2[i]
f_1_api_repr = f_1.to_api_repr()
f_2_api_repr = f_2.to_api_repr()
if f_1_api_repr != f_2_api_repr:
logging.debug('Schema field compare is not equal:\nschema_1: %s\nschema_2: %s', f_1_api_repr,
f_2_api_repr)
return False
return True
def _add_schema(self, schema: DatacastSchema):
bq_schema = self._get_field_list(schema.fields)
table_ref = bigquery.TableReference(self.dataset_ref, schema.name)
if self._does_table_exist(schema.name) is True:
existing_table = self.client.get_table(table_ref)
if GOOGLE_PACKAGED_SOLUTION_KEY not in existing_table.labels \
or existing_table.labels.get(GOOGLE_PACKAGED_SOLUTION_KEY, None) != GOOGLE_PACKAGED_SOLUTION_VALUE:
existing_table.labels.update(GOOGLE_PACKAGED_SOLUTION_LABEL_DICT)
try:
self.client.update_table(existing_table, ["labels"])
except Exception as err: # pylint: disable=broad-except
logging.warning("Failed to update table labels: %s", err)
if self._is_schema_equal(existing_table.schema, bq_schema) is False:
raise BigQueryTableSchemaOutOfSyncError(
f'The schema for table "{table_ref}" differs from the definition for schema "{schema.name}"')
else:
table = bigquery.Table(table_ref, schema=bq_schema)
table.labels = GOOGLE_PACKAGED_SOLUTION_LABEL_DICT
try:
self.client.create_table(table, exists_ok=True)
except Conflict as error:
# MS: 2022-10-12 Adding exists_ok to create_table call so that the Conflict is not raised
# b/153072942
# https://cloud.google.com/bigquery/docs/error-messages
logging.warning('Table conflict, already exists %s: %s', schema.name, error)
except Exception as error:
logging.error('Error creating table %s: %s', schema.name, error)
raise
def _write_record(self, record_type_name, record):
table_ref = bigquery.TableReference(self.dataset_ref, record_type_name)
errors = self.client.insert_rows_json(table_ref, [record])
if errors:
logging.error('Encountered errors while inserting rows: %s', errors)
def _create_dataset(self, dataset_ref):
dataset = bigquery.Dataset(dataset_ref)
dataset.labels = GOOGLE_PACKAGED_SOLUTION_LABEL_DICT
dataset = self.client.create_dataset(dataset, timeout=30)
logging.debug('Created dataset %s.%s', self.client.project, dataset.dataset_id)
def _does_dataset_exist(self, dataset_ref) -> bool:
try:
self.client.get_dataset(dataset_ref)
logging.debug('Dataset %s already exists', dataset_ref)
return True
except NotFound:
logging.debug('Dataset %s is not found', dataset_ref)
return False