classify-split-extract-workflow/classify-job/bq_mlops.py (47 lines of code) (raw):
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=import-error
"""
BigQuery MLOps helper functions for managing tables and models.
This module includes functions for creating external tables, checking and creating
BigQuery tables, and creating or replacing remote models using Document AI processors.
"""
# pylint: disable=logging-fstring-interpolation
from typing import List, Optional
from config import BQ_DATASET_ID_MLOPS
from config import BQ_GCS_CONNECTION_NAME
from config import BQ_OBJECT_TABLE_RETENTION_DAYS
from config import BQ_PROJECT_ID
from config import BQ_REGION
from config import PROJECT_ID
from google.api_core.exceptions import NotFound
from google.cloud import bigquery
from google.cloud.documentai_v1 import Processor
from logging_handler import Logger
from utils import get_utc_timestamp
# BigQuery client
bq = bigquery.Client(project=PROJECT_ID)
logger = Logger.get_logger(__file__)
def object_table_create(
f_uris: List[str],
document_type: str,
table_suffix: str = get_utc_timestamp(),
retention_days: int = BQ_OBJECT_TABLE_RETENTION_DAYS,
) -> str:
"""
Creates an external table in BigQuery to store document URIs.
Args:
f_uris (List[str]): List of file URIs.
document_type (str): Type of the document.
table_suffix (str, optional): Suffix for the table name. Defaults to current UTC timestamp.
retention_days (int, optional): Number of days before the table expires.
Defaults to BQ_OBJECT_TABLE_RETENTION_DAYS.
Returns:
str: The name of the created BigQuery table.
"""
uris = "', '".join(f_uris)
object_table_name = (
f"{BQ_PROJECT_ID}.{BQ_DATASET_ID_MLOPS}."
f"{document_type.upper()}_DOCUMENTS_{table_suffix}"
)
query = f"""
CREATE OR REPLACE EXTERNAL TABLE `{object_table_name}`
WITH CONNECTION `{BQ_PROJECT_ID}.{BQ_REGION}.{BQ_GCS_CONNECTION_NAME}`
OPTIONS(
object_metadata = 'SIMPLE',
metadata_cache_mode = 'AUTOMATIC',
uris = ['{uris}'],
max_staleness = INTERVAL 1 DAY,
expiration_timestamp = TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL {retention_days} DAY)
)
"""
job = bq.query(query=query)
job.result()
logger.info(f"Created external table {object_table_name}")
return object_table_name
def table_create(table_id: str) -> None:
"""
Checks if a BigQuery table exists, and if not, creates it.
Args:
table_id (str): The BigQuery table ID.
"""
try:
bq.get_table(table_id) # Make an API request.
logger.info(f"Table {table_id} already exists.")
except NotFound:
table = bigquery.Table(table_id)
table = bq.create_table(table) # Make an API request.
logger.info(f"Created table {table.table_id}.")
def remote_model_create(processor: Processor, model_name: Optional[str] = None) -> None:
"""
Creates or replaces a remote model in BigQuery using a Document AI processor.
Args:
processor (Processor): Document AI processor.
model_name (str, optional): The name of the model. Defaults to a name based on processor.
"""
if not model_name:
model_name = (
f"{BQ_PROJECT_ID}.{BQ_DATASET_ID_MLOPS}.{processor.name.upper()}_MODEL"
)
query = f"""
CREATE OR REPLACE MODEL `{model_name}`
REMOTE WITH CONNECTION `{BQ_PROJECT_ID}.{BQ_REGION}.{BQ_GCS_CONNECTION_NAME}`
OPTIONS(
REMOTE_SERVICE_TYPE = 'cloud_ai_document_v1',
DOCUMENT_PROCESSOR = "{processor.default_processor_version}"
)
"""
job = bq.query(query=query)
job.result()
logger.info(f"Created or replaced remote model {model_name}")