common/cloud_task/cloud_task_publisher.py (96 lines of code) (raw):

# Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ This module provides functionality to create and publish tasks to Google Cloud Tasks. The module uses Google Cloud Tasks API to manage task creation and dispatch. """ from common.utils import get_logger from common.api.resource_manager_api_adapter import ResourceManagerApiAdapter import google.cloud.tasks_v2 as tasks from google.cloud.tasks_v2 import Queue, RateLimits from google.api_core.exceptions import NotFound import google.auth.transport.requests import google.oauth2.id_token import json import time class CloudTaskPublisher(object): """ A publisher class for creating and submitting tasks to Google Cloud Tasks. """ def __init__(self, project: str, location: str, queue: str, max_rps: int = 60): """ Initializes the CloudTaskPublisher with the necessary configuration. """ self.project = project self.location = location self.queue_name = queue self.max_rps = max_rps self._wait_after_queue_creation = 30 self._cloud_task_client = tasks.CloudTasksClient() self._resource_manager_client = ResourceManagerApiAdapter() self._queue_fqn = self._cloud_task_client.queue_path( self.project, self.location, self.queue_name ) self._logger = get_logger() def create_task( self, json_payload: dict | list, service_name: str, project: str = None, location: str = None, ) -> tasks.Task: """ Creates a task with a JSON payload and adds it to the specified queue. """ project = project or self.project location = location or self.location url = self._form_service_url(service_name, project, location) auth_req = google.auth.transport.requests.Request() id_token = google.oauth2.id_token.fetch_id_token(auth_req, url) http_request = tasks.HttpRequest( { "http_method": tasks.HttpMethod.POST, "url": url, "headers": { "Content-type": "application/json", "Authorization": f"Bearer {id_token}", }, "body": json.dumps(json_payload).encode(), } ) task = tasks.Task({"http_request": http_request}) create_request = tasks.CreateTaskRequest( { "parent": self._queue_fqn, "task": task, } ) try: task = self._cloud_task_client.create_task(create_request) self._logger.info(f"Created task. " f"Endpoint: {url}, " f"payload: {json.dumps(json_payload)}") except NotFound as e: self._logger.info(f"Queue {self._queue_fqn} does not exist") raise e return task def create_queue(self) -> Queue: """ Creates a queue with Google Cloud Queues. """ parent = f"projects/{self.project}/locations/{self.location}" rate_limits = RateLimits({ "max_dispatches_per_second": self.max_rps, }) queue = Queue({ "name": self._queue_fqn, "rate_limits": rate_limits }) result = self._cloud_task_client.create_queue( request={"parent": parent, "queue": queue} ) time.sleep(self._wait_after_queue_creation) self._logger.info(f"Created queue: {self._queue_fqn}") return result def check_queue_exists(self) -> bool: """ Checks if a queue exists. """ try: self._cloud_task_client.get_queue(name=self._queue_fqn) return True except NotFound: self._logger.info(f"Queue {self._queue_fqn} does not exist. " f"Queue will be created automatically.") return False def _form_service_url( self, service_name: str, project: str, location: str ) -> str: """ Form a service URL for cloud task. """ project_number = self._get_project_number(project) return (f"https://{service_name}-" f"{project_number}." f"{location}.run.app") def _get_project_number(self, project) -> str: """ Get the project number using project_id. """ return self._resource_manager_client.get_project_number(project)