in SecretsManagerRDSSQLServerRotationMultiUser/lambda_function.py [0:0]
def apply_database_permissions(cursor, current_user, pending_user):
"""Runs various SQL statements to apply the database permissions from current_user to pending_user
This helper function runs SQL statements to apply the database permissions from current_user to pending_user
Args:
cursor (pymssql.Cursor): The pymssql Cursor object
current_user (string): The current username
pending_user (string): The pending username
Raises:
pymssql.OperationalError: If there are any errors running the SQL statements
ValueError: If any database values were unexpected/invalid
"""
# Get the roles assigned to the current user and assign it to the pending user
query = "SELECT roleprin.name FROM sys.database_role_members rolemems " \
"JOIN sys.database_principals roleprin ON roleprin.principal_id = rolemems.role_principal_id " \
"JOIN sys.database_principals userprin ON userprin.principal_id = rolemems.member_principal_id " \
"WHERE userprin.name = %s"
cursor.execute(query, current_user)
for row in cursor.fetchall():
sql_stmt = "ALTER ROLE %s ADD MEMBER %s" % (row['name'], pending_user)
# Assign each role from the current user to the pending user
cursor.execute(sql_stmt)
# Loop through the database permissions and grant them to the user
query = "SELECT " \
"class = perm.class, " \
"state_desc = perm.state_desc, " \
"perm_name = perm.permission_name, " \
"schema_name = permschem.name, " \
"obj_name = obj.name, " \
"obj_schema_name = objschem.name, " \
"col_name = col.name, " \
"imp_name = imp.name, " \
"imp_type = imp.type, " \
"assembly_name = assembly.name, " \
"type_name = types.name, " \
"type_schema = typeschem.name, " \
"schema_coll_name = schema_coll.name, " \
"xml_schema = xmlschem.name, " \
"msg_type_name = msg_type.name, " \
"contract_name = contract.name, " \
"svc_name = svc.name, " \
"binding_name = binding.name, " \
"route_name = route.name, " \
"catalog_name = catalog.name, " \
"symkey_name = symkey.name, " \
"cert_name = cert.name, " \
"asymkey_name = asymkey.name " \
"FROM sys.database_permissions perm " \
"JOIN sys.database_principals prin ON perm.grantee_principal_id = prin.principal_id " \
"LEFT JOIN sys.schemas permschem ON permschem.schema_id = perm.major_id " \
"LEFT JOIN sys.objects obj ON obj.object_id = perm.major_id " \
"LEFT JOIN sys.schemas objschem ON objschem.schema_id = obj.schema_id " \
"LEFT JOIN sys.columns col ON col.object_id = perm.major_id AND col.column_id = perm.minor_id " \
"LEFT JOIN sys.database_principals imp ON imp.principal_id = perm.major_id " \
"LEFT JOIN sys.assemblies assembly ON assembly.assembly_id = perm.major_id " \
"LEFT JOIN sys.types types ON types.user_type_id = perm.major_id " \
"LEFT JOIN sys.schemas typeschem ON typeschem.schema_id = types.schema_id " \
"LEFT JOIN sys.xml_schema_collections schema_coll ON schema_coll.xml_collection_id = perm.major_id " \
"LEFT JOIN sys.schemas xmlschem ON xmlschem.schema_id = schema_coll.schema_id " \
"LEFT JOIN sys.service_message_types msg_type ON msg_type.message_type_id = perm.major_id " \
"LEFT JOIN sys.service_contracts contract ON contract.service_contract_id = perm.major_id " \
"LEFT JOIN sys.services svc ON svc.service_id = perm.major_id " \
"LEFT JOIN sys.remote_service_bindings binding ON binding.remote_service_binding_id = perm.major_id " \
"LEFT JOIN sys.routes route ON route.route_id = perm.major_id " \
"LEFT JOIN sys.fulltext_catalogs catalog ON catalog.fulltext_catalog_id = perm.major_id " \
"LEFT JOIN sys.symmetric_keys symkey ON symkey.symmetric_key_id = perm.major_id " \
"LEFT JOIN sys.certificates cert ON cert.certificate_id = perm.major_id " \
"LEFT JOIN sys.asymmetric_keys asymkey ON asymkey.asymmetric_key_id = perm.major_id " \
"WHERE prin.name = %s"
cursor.execute(query, current_user)
for row in cursor.fetchall():
# Determine which type of permission this is and create the sql statement accordingly
if row['class'] == 0: # Database permission
permission = row['perm_name']
elif row['class'] == 1: # Object or Column
permission = "%s ON OBJECT::%s.%s" % (row['perm_name'], row['obj_schema_name'], row['obj_name'])
if row['col_name']:
permission = "%s (%s) " % (permission, row['col_name'])
elif row['class'] == 3: # Schema
permission = "%s ON SCHEMA::%s" % (row['perm_name'], row['schema_name'])
elif row['class'] == 4: # Impersonation (Database Principal)
if row['imp_type'] == 'S': # SQL User
permission = "%s ON USER::%s" % (row['perm_name'], row['imp_name'])
elif row['imp_type'] == 'R': # Role
permission = "%s ON ROLE::%s" % (row['perm_name'], row['imp_name'])
elif row['imp_type'] == 'A': # Application Role
permission = "%s ON APPLICATION ROLE::%s" % (row['perm_name'], row['imp_name'])
else:
raise ValueError("Invalid database principal permission type %s" % row['imp_type'])
elif row['class'] == 5: # Assembly
permission = "%s ON ASSEMBLY::%s" % (row['perm_name'], row['assembly_name'])
elif row['class'] == 6: # Type
permission = "%s ON TYPE::%s.%s" % (row['perm_name'], row['type_schema'], row['type_name'])
elif row['class'] == 10: # XML Schema Collection
permission = "%s ON XML SCHEMA COLLECTION::%s.%s" % (row['perm_name'], row['xml_schema'], row['schema_coll_name'])
elif row['class'] == 15: # Message Type
permission = "%s ON MESSAGE TYPE::%s" % (row['perm_name'], row['msg_type_name'])
elif row['class'] == 16: # Service Contract
permission = "%s ON CONTRACT::%s" % (row['perm_name'], row['contract_name'])
elif row['class'] == 17: # Service
permission = "%s ON SERVICE::%s" % (row['perm_name'], row['svc_name'])
elif row['class'] == 18: # Remote Service Binding
permission = "%s ON REMOTE SERVICE BINDING::%s" % (row['perm_name'], row['binding_name'])
elif row['class'] == 19: # Route
permission = "%s ON ROUTE::%s" % (row['perm_name'], row['route_name'])
elif row['class'] == 23: # Full-Text Catalog
permission = "%s ON FULLTEXT CATALOG::%s" % (row['perm_name'], row['catalog_name'])
elif row['class'] == 24: # Symmetric Key
permission = "%s ON SYMMETRIC KEY::%s" % (row['perm_name'], row['symkey_name'])
elif row['class'] == 25: # Certificate
permission = "%s ON CERTIFICATE::%s" % (row['perm_name'], row['cert_name'])
elif row['class'] == 26: # Asymmetric Key
permission = "%s ON ASYMMETRIC KEY::%s" % (row['perm_name'], row['asymkey_name'])
else:
raise ValueError("Invalid database permission class %s" % row['class'])
# Add the state to the statement
if row['state_desc'] == 'GRANT_WITH_GRANT_OPTION':
sql_stmt = "GRANT %s TO %s WITH GRANT OPTION" % (permission, pending_user)
else:
sql_stmt = "%s %s TO %s" % (row['state_desc'], permission, pending_user)
# Execute the sql
cursor.execute(sql_stmt)