services/jobs/fetch_projects/transfer_controller.py (74 lines of code) (raw):
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module defines the TransferController class, which orchestrates the
transfer of data from the Google Cloud Data Catalog to BigQuery. It utilizes the
DatacatalogApiAdapter and BigQueryAdapter to fetch and store data, respectively.
Classes:
- TransferController: A controller class for managing the transfer of tag
templates and entry groups from the Data Catalog to BigQuery.
"""
import datetime
from google.api_core.exceptions import PermissionDenied
from common.api import CloudAssetApiAdapter
from common.big_query import BigQueryAdapter
from common.entities import Project
from common.cloud_task import CloudTaskPublisher
from common.api.resource_manager_api_adapter import ResourceManagerApiAdapter
class TransferController:
"""
A controller class for managing the transfer of data from the Google Cloud
Data Catalog to BigQuery. It handles the retrieval of projects, tag
templates, and entry groups, and writes them to BigQuery tables.
"""
def __init__(self, app_config: dict):
"""
Initializes the TransferController with the specified project.
"""
self.project = app_config["project_name"]
self.location = app_config["service_location"]
self.handler_name = app_config["handler_name"]
self.queue = app_config["queue"]
self._resource_manager_client = ResourceManagerApiAdapter()
self.organization_number = self._get_organization_number(self.project)
self.api_client = CloudAssetApiAdapter(self.organization_number)
self.big_query_client = BigQueryAdapter(app_config)
self.cloud_task_client = CloudTaskPublisher(
self.project, self.location, self.queue
)
def _get_organization_number(self, project):
try:
return self._resource_manager_client.get_organization_number(
project
)
except PermissionDenied as e:
raise PermissionDenied(
f"Not enough permissions or {self.project} doesn't exists"
) from e
def start_transfer(self):
"""
Initiates the data transfer process by fetching projects and resources,
and writing them to BigQuery tables.
"""
projects = self.fetch_projects()
projects = self.merge_projects(projects)
self.create_cloud_tasks(projects)
def fetch_projects(self) -> list[Project]:
"""
Fetches all projects within the organization which have datacatalog
or dataplex API enabled.
"""
return self.api_client.fetch_projects()
def create_cloud_tasks(self, projects: list[Project]):
"""
Create cloud tasks for further processing
"""
if not self.cloud_task_client.check_queue_exists():
self.cloud_task_client.create_queue()
today = datetime.datetime.today().isoformat()
for project in projects:
payload = project.to_dict()
payload["created_at"] = (
today # TODO: move "created_at to constants.py"
)
self.cloud_task_client.create_task(
payload,
self.handler_name,
self.project,
self.location,
)
@staticmethod
def merge_projects(projects: list[Project]) -> list[Project]:
"""
Remove duplicates from the API response
"""
name2projects = {}
for project in projects:
if project.project_id not in name2projects:
name2projects[project.project_id] = project
else:
dataplex_api = (
project.dataplex_api_enabled
or name2projects[project.project_id].dataplex_api_enabled
)
datacatalog_api = (
project.data_catalog_api_enabled
or name2projects[
project.project_id
].data_catalog_api_enabled
)
name2projects[project.project_id].set_dataplex_api_enabled(
dataplex_api
)
name2projects[project.project_id].set_data_catalog_api_enabled(
datacatalog_api
)
return list(name2projects.values())