in tools/k8s-rbac-policies/rbac_patch.py [0:0]
def patchRole(roleName, namespace, extraRules, skipConfirmation=False):
cmd = f"kubectl get role {roleName} -n {namespace} --output json".split(" ")
msg = doTerminalCmd(cmd)
if "(NotFound)" in msg and "Error" in msg:
print(msg)
return False
role = json.loads(msg)
rules = role["rules"]
rulesToAssign = extraRules[::]
passedRules = []
for rule in rules:
apiGroups = set(rule["apiGroups"])
resources = set(rule["resources"])
verbs = set(rule["verbs"])
for extraRule in extraRules:
passes = 0
apiGroupsExtra = set(extraRule["apiGroups"])
resourcesExtra = set(extraRule["resources"])
verbsExtra = set(extraRule["verbs"])
passes += len(apiGroupsExtra.intersection(apiGroups)) >= len(apiGroupsExtra)
passes += len(resourcesExtra.intersection(resources)) >= len(resourcesExtra)
passes += len(verbsExtra.intersection(verbs)) >= len(verbsExtra)
if passes >= 3:
if extraRule not in passedRules:
passedRules.append(extraRule)
if extraRule in rulesToAssign:
rulesToAssign.remove(extraRule)
break
prompt_text = "Apply Changes?"
if len(rulesToAssign) == 0:
print(f"The role {roleName} seems to already have the necessary permissions!")
prompt_text = "Proceed anyways?"
for ruleToAssign in rulesToAssign:
role["rules"].append(ruleToAssign)
delete_if_exists(role, "creationTimestamp")
delete_if_exists(role, "resourceVersion")
delete_if_exists(role, "uid")
new_role = json.dumps(role, indent=3)
uid = uuid.uuid4()
filename = f"Role-{roleName}-New_Permissions-{uid}-TemporaryFile.json"
try:
with open(filename, "w+") as f:
f.write(new_role)
f.flush()
prompt = "y"
if not skipConfirmation:
prompt = input(
doTerminalCmd(f"kubectl diff -f {filename}".split(" ")) + f"\n{prompt_text} y/n: "
).lower().strip()
while prompt != "y" and prompt != "n":
prompt = input("Please make a valid selection. y/n: ").lower().strip()
if prompt == "y":
print(doTerminalCmd(f"kubectl apply -f {filename}".split(" ")))
except Exception as e:
print(e)
os.remove(f"./{filename}")