python/pipelines/components/pubsub/component.py (35 lines of code) (raw):

# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from typing import Optional from kfp.dsl import component, Input, Dataset import os import yaml config_file_path = os.path.join(os.path.dirname( __file__), '../../../../config/config.yaml') base_image = None if os.path.exists(config_file_path): with open(config_file_path, encoding='utf-8') as fh: configs = yaml.full_load(fh) vertex_components_params = configs['vertex_ai']['components'] repo_params = configs['artifact_registry']['pipelines_docker_repo'] # target_image = f"{repo_params['region']}-docker.pkg.dev/{repo_params['project_id']}/{repo_params['name']}/{vertex_components_params['image_name']}:{vertex_components_params['tag']}" base_image = f"{repo_params['region']}-docker.pkg.dev/{repo_params['project_id']}/{repo_params['name']}/{vertex_components_params['base_image_name']}:{vertex_components_params['base_image_tag']}" @component(base_image=base_image) def send_pubsub_activation_msg( project: str, topic_name: str, activation_type: str, predictions_table: Input[Dataset] ) -> None: """ This function sends a Pub/Sub message to trigger the activation application. Args: project: The Google Cloud project ID. topic_name: The name of the Pub/Sub topic to send the message to. activation_type: The type of activation message to send. predictions_table: The BigQuery table containing the predictions to be activated. Returns: None """ import json import logging from google.cloud import pubsub publisher = pubsub.PublisherClient() logging.info(f'Publishing message to topic {topic_name}') # References an existing topic topic_path = publisher.topic_path(project, topic_name) message_json = json.dumps({ "activation_type": activation_type, "source_table": predictions_table.metadata['table_id'], "predictions_column": predictions_table.metadata['predictions_column'] }) message_bytes = message_json.encode('utf-8') # Publishes a message publish_future = publisher.publish(topic_path, data=message_bytes) publish_future.result() # Verify the publish succeeded logging.info('Message published.')