in reference-architectures/automated-password-rotation/terraform/code/main.py [0:0]
def password_rotation_function(cloud_event: CloudEvent):
"""Background Cloud Function to be triggered by Pub/Sub.
Args:
cloud_event (CloudEvent): Event of type
google.cloud.pubsub.topic.v1.
messagePublished
"""
pubsub_message = base64.b64decode(cloud_event.data["message"]["data"]).decode()
message_data = json.loads(pubsub_message)
project_id = get_project_id()
secret_id = message_data["secretid"]
db_user = message_data["db_user"]
db_name = message_data["db_name"]
location = message_data["db_location"]
instance_name = message_data["instance_name"]
client = secretmanager.SecretManagerServiceClient()
# Get and rotate the secret
parent = f"projects/{project_id}/secrets/{secret_id}"
response = client.access_secret_version(
request={"name": f"{parent}/versions/latest"}
)
db_pass = response.payload.data.decode("UTF-8")
new_db_pass = get_random_string(10)
print("Updating the password in secret manager")
update_secret_status = update_secret(project_id, secret_id, new_db_pass)
if update_secret_status:
print(f"Secret {secret_id} rotated successfully in Secret Manager!")
else:
print(f"Unable to update {secret_id} in Secret Manager")
return update_secret_status
reset_password_status = reset_password(
instance_name, db_name, location, db_user, db_pass, new_db_pass
)
if reset_password_status:
print("DB password changed successfully!")
else:
print("Unable to change password")
# Add code to rollback the secret to the previous version
# in secret manager if the password reset in the DB failed
return reset_password_status
verify = verify_change_password(
instance_name, db_name, location, db_user, new_db_pass
)
if verify:
print("DB password verified successfully!")
else:
print("Unable to verify password.")
return verify