In [None]:
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.`
# See the License for the specific language governing permissions and
# limitations under the License.

# Environment Setup

Install the following python packages to setup the environment.

In [None]:
! pip install -U google-cloud-datacatalog
! pip install -U google-cloud-storage
! pip install -U google-cloud-bigquery
! pip install -U numpy

Specify your project ID in the next cell.

In [None]:
PROJECT_ID = '<YOUR PROJECT ID>'  # Change to your project ID
LOCATION = 'us-central1'
DATASET_ID = 'cdp_dataset'

# Tag template 
TAG_TEMPLATE_ID = 'llmcdptemplate'
TAG_TEMPLATE_PATH = f"projects/{PROJECT_ID}/locations/{LOCATION}/tagTemplates/{TAG_TEMPLATE_ID}"

# Set the project id
! gcloud config set project {PROJECT_ID}

### BigQuery: Create dataset

Create a BigQuery dataset to upload the CDP data.

In [None]:
# Create BigQuery Dataset talktodata on your project
from google.cloud import bigquery
from google.cloud import datacatalog_v1

bq_client = bigquery.Client(project=PROJECT_ID)
datacatalog_client = datacatalog_v1.DataCatalogClient()

dataset_id = "{}.{}".format(bq_client.project, DATASET_ID)
dataset = bigquery.Dataset(dataset_id)
dataset.location = "US"

# Create the dataset
try:
    dataset = bq_client.create_dataset(dataset, timeout=30)
    print(f'Dataset {DATASET_ID} create successfully.')
except Exception as e:
    print(e)

### BigQuery: Create tables and populate with data

The next cell will generate the synthetic data for the tables and load to BigQuery.

> This process will take approximately 1 minute and 30 seconds.

If this process fails, try to recreate the dataset with the cell above and regenerate the data.

In [None]:
from aux_data import data_gen

data_gen.generate_and_populate_dataset(
    PROJECT_ID=PROJECT_ID,
    DATASET_ID=DATASET_ID
)

### Setup Data Catalog

The cell bellow will execute the following steps:

1) Specify a query to retrieve the metadata from the tables you just uploaded;
2) Create a TagTemplate on Google Dataplex that specifies how the table will be tagged with medatada;
3) Tag all the tables you created on BigQuery.

In [None]:
from aux_data import bq_tag_generation

bq_tag_generation.create_template_and_tag_bq(
    PROJECT_ID,
    DATASET_ID,
    TAG_TEMPLATE_ID,
    LOCATION
)

# Quick test

Test the integration by retrieving the metadata from BigQuery tables.

In [None]:
QUERY = f'SELECT * FROM `{PROJECT_ID}.{DATASET_ID}.INFORMATION_SCHEMA.TABLES` WHERE table_name NOT LIKE "%metadata%"'

In [None]:
def get_tags_from_table(table_id):
    # Lookup Data Catalog's Entry referring to the table.
    resource_name = (
        f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET_ID}/tables/{table_id}"
    )
    table_entry = datacatalog_client.lookup_entry(
        request={"linked_resource": resource_name}
    )

    # Make the request
    page_result = datacatalog_client.list_tags(parent=table_entry.name)
    # print(page_result)

    tags_str = ''

    # Handle the response
    for response in page_result:
        if response.template == TAG_TEMPLATE_PATH:
            desc = response.fields["description"].string_value
            data_type = response.fields["data_type"].string_value
            pk = response.fields["is_primary_key"].bool_value
            fk = response.fields["is_foreign_key"].bool_value            
            tags_str += ("Table: {} "
                         "- Column: {} " 
                         "- Data Type: {} " 
                         "- Primary Key: {} " 
                         "- Foreing Key: {} " 
                         "- Description: {}\n".format(
                table_id, response.column, data_type, pk, fk, desc))
    return tags_str

In [None]:
def get_metadata_from_dataset(
        query: str
):
    # print("Gets the metadata once")
    query_job = bq_client.query(query)  # API request
    rows = query_job.result()
    metadata = []

    for row in rows:
        table_metadata = {}
        table_metadata['ddl'] = row.ddl
        table_metadata['description'] = get_tags_from_table(row.table_name)
        metadata.append(table_metadata)
    
    return metadata

In [None]:
tags = get_metadata_from_dataset(QUERY)
for i in tags:
    print(i['description'])