services/handlers/fetch_resources/main.py (137 lines of code) (raw):

# Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ This module defines a server application using the Flask framework. It is designed to handle cloud tasks related to Google Cloud Platform (GCP) resources, specifically for managing entry groups and tag templates in a data catalog. """ from flask import Flask, request, jsonify from asgiref.wsgi import WsgiToAsgi import uvicorn import json from datetime import datetime from googleapiclient.errors import HttpError from common.api.resource_manager_api_adapter import ResourceManagerApiAdapter from common.exceptions.exceptions import ValidationError from common.api import DatacatalogApiAdapter from common.big_query import BigQueryAdapter from common.cloud_task import CloudTaskPublisher from common.utils import get_logger from config import get_application_config app = Flask(__name__) asgi_app = WsgiToAsgi(app) logger = get_logger() class CloudTaskHandler: """ Handles cloud tasks, validates task data, and manages the lifecycle of the application. This class interacts with GCP resources to manage entry groups and tag templates. """ def __init__(self, app_config): """ Initializes the CloudTaskHandler with configuration for the GCP project and other settings. """ self.project = app_config["project_name"] self.service_location = app_config["service_location"] self.queue = app_config["queue"] self.handler_name = app_config["handler_name"] self.dataset_name = app_config["dataset_name"] self._resource_manager_client = ResourceManagerApiAdapter() self.api_client = DatacatalogApiAdapter() self.big_query_client = BigQueryAdapter(app_config) self.cloud_task_publisher = CloudTaskPublisher( self.project, self.service_location, self.queue ) def _validate_task_data(self, task_data): """ Validates the incoming task data against required fields and types. """ required_fields = { "scope": str, "type": str, "nextPageToken": (str, type(None)), "is_transferred": bool, "createdAt": str, } missing_fields = [ field for field in required_fields if field not in task_data ] if missing_fields: raise ValidationError( f"Missing required fields: {', '.join(missing_fields)}" ) for field, expected_types in required_fields.items(): if isinstance(expected_types, tuple): if not isinstance(task_data[field], expected_types): expected_type_names = ", ".join( t.__name__ for t in expected_types ) raise ValidationError( f"Invalid type for '{field}', " f"expected one of: {expected_type_names}" ) else: if not isinstance(task_data[field], expected_types): raise ValidationError( f"Invalid type for '{field}', " f"expected {expected_types.__name__}" ) try: self._resource_manager_client.get_project_number(self.project) except HttpError as e: raise ValidationError( f"Not enough permissions or {self.project} doesn't exists" ) from e if task_data["type"] not in ["entry_group", "tag_template"]: raise ValidationError("Invalid entry type for 'type'") date_format = "%Y-%m-%d" try: datetime.strptime(task_data["createdAt"], date_format) except ValueError as exc: raise ValidationError( "Invalid date format for 'createdAt'. " f"Expected format: {date_format}" ) from exc if task_data["type"] == "tag_template": if "is_public" not in task_data: raise ValidationError( "Missing required field: 'is_public' for tag_template" ) if not isinstance(task_data["is_public"], bool): raise ValidationError( "Invalid type for 'is_public', expected bool" ) async def handle_cloud_task(self, task_data): """ Processes the task data, interacts with the Datacatalog API, and writes results to BigQuery. """ try: self._validate_task_data(task_data) except ValidationError as e: return jsonify({"error": str(e)}), 400 match task_data["type"]: case "entry_group": api_result, next_page_token = ( self.api_client.search_entry_groups( [task_data["scope"]], task_data["is_transferred"], page_token=task_data["nextPageToken"], ) ) case "tag_template": api_result, next_page_token = ( self.api_client.search_tag_templates( [task_data["scope"]], task_data["is_public"], task_data["is_transferred"], page_token=task_data["nextPageToken"], ) ) case _: api_result, next_page_token = (None, None) if api_result: converted_date = datetime.strptime( task_data["createdAt"], "%Y-%m-%d" ).date() self.big_query_client.write_to_table( f"{self.project}.{self.dataset_name}.{task_data["type"]}s", api_result, converted_date, ) if next_page_token: payload = task_data.copy() payload["nextPageToken"] = next_page_token self.cloud_task_publisher.create_task( payload, self.handler_name, self.project, self.service_location ) return jsonify({"message": "Task processed"}), 200 @app.route('/', methods=['POST', 'PUT']) async def process_task(): """ Route to process cloud tasks. """ task_data = json.loads(request.data) config = get_application_config() handler = CloudTaskHandler(config) return await handler.handle_cloud_task(task_data) if __name__ == "__main__": uvicorn.run(asgi_app, host="0.0.0.0", port=8080)