def password_rotation_function()

in reference-architectures/automated-password-rotation/terraform/code/main.py [0:0]


def password_rotation_function(cloud_event: CloudEvent):
    """Background Cloud Function to be triggered by Pub/Sub.
    Args:
         cloud_event (CloudEvent):  Event of type
                                    google.cloud.pubsub.topic.v1.
                                    messagePublished
    """
    pubsub_message = base64.b64decode(cloud_event.data["message"]["data"]).decode()
    message_data = json.loads(pubsub_message)
    project_id = get_project_id()
    secret_id = message_data["secretid"]
    db_user = message_data["db_user"]
    db_name = message_data["db_name"]
    location = message_data["db_location"]
    instance_name = message_data["instance_name"]
    client = secretmanager.SecretManagerServiceClient()
    # Get and rotate the secret
    parent = f"projects/{project_id}/secrets/{secret_id}"
    response = client.access_secret_version(
        request={"name": f"{parent}/versions/latest"}
    )
    db_pass = response.payload.data.decode("UTF-8")
    new_db_pass = get_random_string(10)
    print("Updating the password in secret manager")
    update_secret_status = update_secret(project_id, secret_id, new_db_pass)
    if update_secret_status:
        print(f"Secret {secret_id} rotated successfully in Secret Manager!")
    else:
        print(f"Unable to update {secret_id} in Secret Manager")
        return update_secret_status
    reset_password_status = reset_password(
        instance_name, db_name, location, db_user, db_pass, new_db_pass
    )
    if reset_password_status:
        print("DB password changed successfully!")
    else:
        print("Unable to change password")
        # Add code to rollback the secret to the previous version
        # in secret manager if the password reset in the DB failed
        return reset_password_status
    verify = verify_change_password(
        instance_name, db_name, location, db_user, new_db_pass
    )
    if verify:
        print("DB password verified successfully!")
    else:
        print("Unable to verify password.")
    return verify