scenario-notebooks/UserSecurityMetadata/Entities.py (87 lines of code) (raw):
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
from validate_email import validate_email
from Utils import executeProcess
from NodeEdge import Node, DrawableNode
from functools import lru_cache
class User(DrawableNode):
def __init__(self, name, email, objectId):
self.name = name
self.email = email
self.objectId = objectId
def getNode(self):
return Node(self.objectId, self.name, "User")
@staticmethod
@lru_cache(maxsize=100)
def getUserById(userId):
rawoutput = executeProcess(
f'az ad user show --id {userId} --query [displayName,mail,objectId] --output tsv'.split(' '))
output = rawoutput.split('\n')
if len(output) != 3:
raise Exception(
f'Unable to get AAD User with Id - {userId}. Error - {rawoutput}')
else:
user = User(output[0], output[1], output[2])
return user
@staticmethod
@lru_cache(maxsize=100)
def getUserByEmail(userEmail):
rawoutput = executeProcess(
f'az ad user list --filter startswith(mail,\'{userEmail}\') --query [0].{{Name:displayName,Email:mail,ObjectId:objectId}} --output tsv'.split(' '))
output = rawoutput.split('\t')
if len(output) != 3:
raise Exception(
f'Not found - User with email - {userEmail}.')
else:
user = User(output[0], output[1], output[2])
return user
@staticmethod
def getUserByIdOrEmail(userIdOrEmail):
try:
return User.getUserById(userIdOrEmail)
except:
isvalidEmail = validate_email(userIdOrEmail)
if isvalidEmail:
return User.getUserByEmail(userIdOrEmail)
raise
class Group(DrawableNode):
def __init__(self, name, email, groupId):
self.name = name
self.email = email
self.groupId = groupId
def getNode(self):
return Node(self.groupId, self.name, "Group")
@staticmethod
@lru_cache(maxsize=100)
def getGroupById(groupId):
rawoutput = executeProcess(
f'az ad group show --group {groupId} --query [displayName,mail,objectId] --output tsv'.split(' '))
output = rawoutput.split('\n')
if len(output) != 3:
raise Exception(
f'Unable to get AAD Group with Id - {groupId}. Error - {rawoutput}')
else:
group = Group(output[0], output[1], output[2])
return group
class ServicePrincipal(DrawableNode):
def __init__(self, name, objectId):
self.name = name
self.objectId = objectId
def getNode(self):
return Node(self.objectId, self.name, "ServicePrincipal")
@staticmethod
@lru_cache(maxsize=100)
def getServicePrincipalById(objectId):
rawoutput = executeProcess(
f'az ad sp show --id {objectId} --query [displayName,objectId] --output tsv'.split(' '))
output = rawoutput.split('\n')
if len(output) != 2:
raise Exception(
f'Unable to get AAD ServicePrincipal with Id - {objectId}. Error - {rawoutput}')
else:
sp = ServicePrincipal(output[0], output[1])
return sp
class Subscription(DrawableNode):
def __init__(self, name, subId):
self.name = name
self.subId = subId
def getNode(self):
return Node(self.subId, self.name, "AzureSubscription")