reference-architectures/automated-password-rotation/terraform/code/main.py (136 lines of code) (raw):
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Automates Cloud SQL PostgreSQL password rotation via a Cloud Function.
This script defines a Google Cloud Function triggered by a Pub/Sub message.
The function orchestrates the rotation of a database user's password for a
Cloud SQL PostgreSQL instance. The password (secret) is managed using
Google Cloud Secret Manager.
Workflow:
1. Trigger: Receives a CloudEvent from a Pub/Sub topic. The message payload
(JSON format, base64 encoded) contains details like the Secret Manager
secret ID, database user, database name, Cloud SQL instance name, and
location.
2. Decode Message: Decodes the Pub/Sub message to extract necessary parameters.
3. Fetch Current Secret: Retrieves the current database password from the
specified secret ID in Secret Manager.
4. Generate New Password: Creates a new random password.
5. Update Secret Manager: Adds a new version to the secret in Secret Manager
containing the newly generated password.
6. Reset Database Password: Connects to the specified Cloud SQL instance
(using the Cloud SQL Python Connector and SQLAlchemy) using the *old*
password and executes an `ALTER USER` command to set the password to the
*new* one.
7. Verify New Password: Attempts to connect to the database using the *new*
password and executes a simple query to confirm the rotation was successful.
8. Logging: Prints status messages throughout the process.
Helper Functions:
- get_project_id: Retrieves the Google Cloud Project ID from the
metadata server.
- get_random_string: Generates a random string for the new password.
- update_secret: Handles adding a new version to a secret in Secret Manager.
- getconn: Establishes a database connection using the Cloud SQL Connector.
- reset_password: Connects to the database and executes the password change SQL.
- verify_change_password: Connects to the database with the new password to
verify it works.
"""
import base64
import json
import random
import string
import time
import urllib.request
import functions_framework
import sqlalchemy
from cloudevents.http import CloudEvent
from google.cloud import secretmanager
from google.cloud.sql.connector import Connector, IPTypes
from sqlalchemy.sql import text
@functions_framework.cloud_event
def password_rotation_function(cloud_event: CloudEvent):
"""Background Cloud Function to be triggered by Pub/Sub.
Args:
cloud_event (CloudEvent): Event of type
google.cloud.pubsub.topic.v1.
messagePublished
"""
pubsub_message = base64.b64decode(cloud_event.data["message"]["data"]).decode()
message_data = json.loads(pubsub_message)
project_id = get_project_id()
secret_id = message_data["secretid"]
db_user = message_data["db_user"]
db_name = message_data["db_name"]
location = message_data["db_location"]
instance_name = message_data["instance_name"]
client = secretmanager.SecretManagerServiceClient()
# Get and rotate the secret
parent = f"projects/{project_id}/secrets/{secret_id}"
response = client.access_secret_version(
request={"name": f"{parent}/versions/latest"}
)
db_pass = response.payload.data.decode("UTF-8")
new_db_pass = get_random_string(10)
print("Updating the password in secret manager")
update_secret_status = update_secret(project_id, secret_id, new_db_pass)
if update_secret_status:
print(f"Secret {secret_id} rotated successfully in Secret Manager!")
else:
print(f"Unable to update {secret_id} in Secret Manager")
return update_secret_status
reset_password_status = reset_password(
instance_name, db_name, location, db_user, db_pass, new_db_pass
)
if reset_password_status:
print("DB password changed successfully!")
else:
print("Unable to change password")
# Add code to rollback the secret to the previous version
# in secret manager if the password reset in the DB failed
return reset_password_status
verify = verify_change_password(
instance_name, db_name, location, db_user, new_db_pass
)
if verify:
print("DB password verified successfully!")
else:
print("Unable to verify password.")
return verify
def get_project_id():
"""Function to fetch project id.
Args:
None.
Returns:
project_id: Id of the project
"""
metadata_base_url = "http://metadata.google.internal/computeMetadata/v1/"
project_id_path = "project/project-id"
url = metadata_base_url + project_id_path
req = urllib.request.Request(url)
req.add_header("Metadata-Flavor", "Google")
project_id = urllib.request.urlopen(req).read().decode()
return project_id
def get_random_string(length):
"""Function to generate random string.
Args:
length: Length of the string to generate
Returns:
result_str: Random string
"""
# choose from all lowercase letter
letters = string.ascii_lowercase
result_str = "".join(random.choice(letters) for i in range(length))
return result_str
def update_secret(project_id, secret_id, new_secret_value):
"""Updates the value of a secret in Google Cloud Secret Manager.
Args:
project_id (str): Your Google Cloud Project ID.
secret_id (str): The ID of the secret to update.
new_secret_value (bytes): The new secret value as a bytes object.
"""
client = secretmanager.SecretManagerServiceClient()
# Build the secret path (required format for Secret Manager)
name = f"projects/{project_id}/secrets/{secret_id}"
# Prepare the payload with the updated secret data
payload = {"data": new_secret_value.encode("UTF-8")}
# Perform the update
try:
updated_secret = client.add_secret_version(
request={
"parent": name,
"payload": payload,
}
)
print(f"Updated secret {secret_id} to version: {updated_secret.name}")
return True
except Exception as e: # pylint: disable=broad-exception-caught
print(e)
return False
# Function to establish a database connection
def getconn(instance_connection_name, user, password, db):
"""Function to establish connection with CloudSQl.
Args:
instance_connection_name(string) : instance connection name
user (string) : postgressql user
password(string) : user password
db(string. : database
Returns:
conn : connection
"""
with Connector(ip_type=IPTypes.PRIVATE) as connector:
conn = connector.connect(
instance_connection_name,
"pg8000",
user=user,
password=password,
db=db,
)
return conn
def reset_password(instance_name, db_name, location, db_user, db_pass, new_db_pass):
"""Function to reset Postgresql user password.
Args:
instance_name(string) : Cloudsql instance name
db_name(string) : postgres db user
location(string) : Cloudsql instance location
db_user(string) : posygres db user
db_pass(string) : postgres db password
new_db_pass(string) : new password
Returns:
True/False(bool) : flag indicating success or failure in
resetting the password
"""
project_id = get_project_id()
instance_connection_name = project_id + ":" + location + ":" + instance_name
# Establish the connection pool
def conn():
return getconn(instance_connection_name, db_user, db_pass, db_name)
try:
pool = sqlalchemy.create_engine(
"postgresql+pg8000://",
creator=conn,
)
except Exception as e: # pylint: disable=broad-exception-caught
print(e)
try:
with pool.connect() as db_conn:
query = f"ALTER USER {db_user} WITH PASSWORD '{new_db_pass}'"
db_conn.execute(text(query))
db_conn.commit()
pool.dispose()
time.sleep(10)
return True
except Exception as e: # pylint: disable=broad-exception-caught
print("Failed to reset password due to the following exception")
print(e)
return False
def verify_change_password(instance_name, db_name, location, db_user, db_pass):
"""Function to test Postgresql user password.
Args:
instance_name(string) : Cloudsql instance name
db_name(string) : postgres db user
location(string) : Cloudsql instance location
db_user(string) : posygres db user
db_pass(string) : postgres db password
Returns:
True/False(bool) : flag indicating success or failure in
verifying the password
"""
project_id = get_project_id()
instance_connection_name = project_id + ":" + location + ":" + instance_name
# Establish the connection pool
def conn_verify():
return getconn(instance_connection_name, db_user, db_pass, db_name)
pool = sqlalchemy.create_engine(
"postgresql+pg8000://",
creator=conn_verify,
)
print("Verifying the new password works by executing a sample query")
try:
with pool.connect() as db_conn:
results = db_conn.execute(
sqlalchemy.text("select * from information_schema.tables limit 1;")
).fetchall()
for row in results:
print(row)
return True
except Exception as e: # pylint: disable=broad-exception-caught
print("Cannot verify that the new password due to the following exception")
print(e)
return False