helpers/wrapped-key/wrapped_key.py (86 lines of code) (raw):

# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import base64 import crcmod import six from google.cloud import kms import google.auth from google.auth import impersonated_credentials def encrypt_symmetric(project_id, location_id, key_ring_id, key_id, client): """ Encrypt securely generated random bytes using a confidential computing symmetric key. Args: project_id (string): Google Cloud project ID. location_id (string): Cloud KMS location. key_ring_id (string): ID of the Cloud KMS key ring. key_id (string): ID of the key to use. client (KeyManagementServiceClient): Google Cloud Key Management Service. Returns: bytes: Encrypted ciphertext. """ # Generate random bytes. plaintext_bytes = generate_random_bytes( project_id, location_id, 32, client) # Optional, but recommended: compute plaintext's CRC32C. # See crc32c() function defined below. plaintext_crc32c = crc32c(plaintext_bytes) # Build the key name. key_name = client.crypto_key_path( project_id, location_id, key_ring_id, key_id) # Call the API. encrypt_response = client.encrypt( request={'name': key_name, 'plaintext': plaintext_bytes, 'plaintext_crc32c': plaintext_crc32c}) # Optional, but recommended: perform integrity verification # on encrypt_response. # For more details on ensuring E2E in-transit integrity to # and from Cloud KMS visit: # https://cloud.google.com/kms/docs/data-integrity-guidelines if not encrypt_response.verified_plaintext_crc32c: raise Exception( 'The request sent to the server was corrupted in-transit.') if not encrypt_response.ciphertext_crc32c == \ crc32c(encrypt_response.ciphertext): raise Exception( 'The response received from the server was corrupted in-transit.') # End integrity verification return encrypt_response def generate_random_bytes(project_id, location_id, num_bytes, client): """ Generate random bytes with entropy sourced from the given location. Args: project_id (string): Google Cloud project ID (e.g. 'my-project'). location_id (string): Cloud KMS location (e.g. 'us-east1'). num_bytes (integer): number of bytes of random data. client (KeyManagementServiceClient): Google Cloud Key Management Service. Returns: bytes: Encrypted ciphertext. """ # Build the location name. location_name = client.common_location_path(project_id, location_id) # Call the API. protection_level = kms.ProtectionLevel.HSM random_bytes_response = client.generate_random_bytes( request={'location': location_name, 'length_bytes': num_bytes, 'protection_level': protection_level}) return random_bytes_response.data def crc32c(data): """ Calculates the CRC32C checksum of the provided data. Args: data: the bytes over which the checksum should be calculated. Returns: An int representing the CRC32C checksum of the provided bytes. """ crc32c_fun = crcmod.predefined.mkPredefinedCrcFun('crc-32c') return crc32c_fun(six.ensure_binary(data)) if __name__ == '__main__': parser = argparse.ArgumentParser( description='Encrypt securely generated random bytes ' 'using a symmetric key.') group1 = parser.add_argument_group("Crypto Key Self link") group2 = parser.add_argument_group("Crypto Key parameters") group3 = parser.add_argument_group("Service Account to be impersonated") group2.add_argument('--project_id', dest='project_id', help='project_id (string): Google Cloud project ID.') group2.add_argument('--location_id', dest='location_id', help='location_id (string): Cloud KMS location.') group2.add_argument('--key_ring_id', dest='key_ring_id', help="key_ring_id (string): ID of the" "Cloud KMS key ring.") group2.add_argument('--key_id', dest='key_id', help='key_id (string): ID of the key to use.') group1.add_argument('--crypto_key_path', dest='crypto_key_path', help='crypto_key_path (string): ' 'Crypto key path to use. ' 'Expected format: projects/PROJECT-ID/' 'locations/LOCATION-ID' '/keyRings/KEY-RING-ID/cryptoKeys/KEY-ID') group3.add_argument('--service_account', dest='service_account', help='service_account (string): ' 'Service Account to be impersonated.') args = parser.parse_args() # Create the client. if args.service_account is not None: target_scopes = ['https://www.googleapis.com/auth/cloud-platform'] source_credentials, project = google.auth.default( scopes=target_scopes) target_credentials = impersonated_credentials.Credentials( source_credentials=source_credentials, target_principal=args.service_account, target_scopes=target_scopes, lifetime=10) client = kms.KeyManagementServiceClient(credentials=target_credentials) else: client = kms.KeyManagementServiceClient() if args.crypto_key_path is not None: key_ring_args = client.parse_crypto_key_path(args.crypto_key_path) project_id = key_ring_args['project'] location_id = key_ring_args['location'] key_ring_id = key_ring_args['key_ring'] key_id = key_ring_args['crypto_key'] else: project_id = args.project_id location_id = args.location_id key_ring_id = args.key_ring_id key_id = args.key_id encrypt_response = encrypt_symmetric(project_id, location_id, key_ring_id, key_id, client) print(base64.b64encode(encrypt_response.ciphertext).decode("utf-8"))