5-app-infra/6-service-catalog-solutions/helpers/pubsub-job-emulator/simple-pubsub-job.py (87 lines of code) (raw):

from __future__ import absolute_import import time import argparse import json import logging from google.cloud import pubsub_v1, secretmanager import tink from tink import aead from tink import cleartext_keyset_handle from tink.integration import gcpkms import base64 def publish_encrypted_messages(primitive, topic_path, input_file): with open(input_file, 'r') as file: for line in file: record = json.loads(line.strip()) message_data = json.dumps(record) ciphertext = primitive.encrypt(message_data.encode('utf-8'), b'') encrypted_message = base64.b64encode(ciphertext) future = publisher.publish(topic_path, encrypted_message) future.result() # Block until the message is published print(future.result()) time.sleep(1) # Sleep for a second before publishing another message if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) parser = argparse.ArgumentParser() parser.add_argument( '--cryptoKeyName', required=False, default="projects/prj-c-kms-fc4b/locations/us-central1/keyRings/sample-keyring/cryptoKeys/deidenfication_key_common", help=( 'GCP KMS Key URI as' 'projects/<PROJECT_ID>/locations/<LOCATION>/keyRings/<KEY_RING>/cryptoKeys/<KEY_NAME>' ) ) parser.add_argument( '--wrappedKey', required=False, default="projects/826586300218/secrets/kms-wrapper/versions/5", help=( 'Keyset base64 encoded wrapped key from Secret Manager' 'projects/<PROJECT_ID>/secrets/<SECRET_NAME>/versions/<VERSION>' ) ) parser.add_argument( '--topic_id', required=False, default="data_ingestion", help=( 'Pub/Sub Topic to publish messages to' '<TOPIC_ID>' ) ) parser.add_argument( '--messages_file', required=False, default="/helpers/sample-generator/sample-100-raw.json", help=( 'Sample json file containing messages in json format as:' '<PATH_TO_FILE>/<FILE_NAME>.json' ) ) parser.add_argument( '--project_id', required=False, default="prj-d-bu4-domain-1-ngst-ay3k", help=( 'GCP Project ID where pubsub is running' ) ) args = parser.parse_args() # Initialize Pub/Sub publisher publisher = pubsub_v1.PublisherClient() # Set up variables from args input_file = args.messages_file # Path to the input JSON file topic_path = publisher.topic_path(args.project_id, args.topic_id) # Initialize Tink aead.register() # Set up variables from args cryptoKeyName = f'gcp-kms://{args.cryptoKeyName}' wrappedKey = args.wrappedKey # Initialize the GCP KMS client gcp_client = gcpkms.GcpKmsClient(key_uri=cryptoKeyName, credentials_path=None) kms_aead = gcp_client.get_aead(cryptoKeyName) # Initialize the Secret Manager client secret_client = secretmanager.SecretManagerServiceClient() # Access the secret version response = secret_client.access_secret_version(name=wrappedKey) wrapped_keyset = response.payload.data.decode("UTF-8") wrapped_key = json.loads(response.payload.data)['encryptedKeyset'].encode('utf-8') # Extract the encrypted keyset and decrypt it encrypted_keyset = base64.b64decode(wrapped_key) decrypted_keyset = kms_aead.decrypt(encrypted_keyset, b'') # Load the decrypted keyset into a Tink keyset handle decrypted_keyset_handle = cleartext_keyset_handle.read(tink.BinaryKeysetReader(decrypted_keyset)) # Get the AEAD primitive aead_primitive = decrypted_keyset_handle.primitive(aead.Aead) # Publish sample messages to Pub/Sub topic publish_encrypted_messages(primitive=aead_primitive, input_file=input_file, topic_path=topic_path)