tools/k8s-rbac-policies/rbac_patch.py (115 lines of code) (raw):
import os
import subprocess as sp
import tempfile as temp
import json
import argparse
import uuid
def delete_if_exists(dictionary: dict, key: str):
if dictionary.get(key, None) is not None:
del dictionary[key]
def doTerminalCmd(cmd):
with temp.TemporaryFile() as f:
process = sp.Popen(cmd, stdout=f, stderr=f)
process.wait()
f.seek(0)
msg = f.read().decode()
return msg
def patchRole(roleName, namespace, extraRules, skipConfirmation=False):
cmd = f"kubectl get role {roleName} -n {namespace} --output json".split(" ")
msg = doTerminalCmd(cmd)
if "(NotFound)" in msg and "Error" in msg:
print(msg)
return False
role = json.loads(msg)
rules = role["rules"]
rulesToAssign = extraRules[::]
passedRules = []
for rule in rules:
apiGroups = set(rule["apiGroups"])
resources = set(rule["resources"])
verbs = set(rule["verbs"])
for extraRule in extraRules:
passes = 0
apiGroupsExtra = set(extraRule["apiGroups"])
resourcesExtra = set(extraRule["resources"])
verbsExtra = set(extraRule["verbs"])
passes += len(apiGroupsExtra.intersection(apiGroups)) >= len(apiGroupsExtra)
passes += len(resourcesExtra.intersection(resources)) >= len(resourcesExtra)
passes += len(verbsExtra.intersection(verbs)) >= len(verbsExtra)
if passes >= 3:
if extraRule not in passedRules:
passedRules.append(extraRule)
if extraRule in rulesToAssign:
rulesToAssign.remove(extraRule)
break
prompt_text = "Apply Changes?"
if len(rulesToAssign) == 0:
print(f"The role {roleName} seems to already have the necessary permissions!")
prompt_text = "Proceed anyways?"
for ruleToAssign in rulesToAssign:
role["rules"].append(ruleToAssign)
delete_if_exists(role, "creationTimestamp")
delete_if_exists(role, "resourceVersion")
delete_if_exists(role, "uid")
new_role = json.dumps(role, indent=3)
uid = uuid.uuid4()
filename = f"Role-{roleName}-New_Permissions-{uid}-TemporaryFile.json"
try:
with open(filename, "w+") as f:
f.write(new_role)
f.flush()
prompt = "y"
if not skipConfirmation:
prompt = input(
doTerminalCmd(f"kubectl diff -f {filename}".split(" ")) + f"\n{prompt_text} y/n: "
).lower().strip()
while prompt != "y" and prompt != "n":
prompt = input("Please make a valid selection. y/n: ").lower().strip()
if prompt == "y":
print(doTerminalCmd(f"kubectl apply -f {filename}".split(" ")))
except Exception as e:
print(e)
os.remove(f"./{filename}")
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("-n", "--namespace",
help="Namespace of the Role. By default its the VirtualCluster's namespace",
required=True,
dest="namespace"
)
parser.add_argument("-p", "--no-prompt",
help="Applies the patches without asking first",
dest="no_prompt",
default=False,
action="store_true"
)
args = parser.parse_args()
emrRoleRules = [
{
"apiGroups": [""],
"resources": ["persistentvolumeclaims"],
"verbs": ["list", "create", "delete"]
}
]
driverRoleRules = [
{
"apiGroups": [""],
"resources": ["persistentvolumeclaims"],
"verbs": ["list", "create", "delete"]
},
{
"apiGroups": [""],
"resources": ["services"],
"verbs": ["get", "list", "describe", "create", "delete", "watch"]
}
]
clientRoleRules = [
{
"apiGroups": [""],
"resources": ["persistentvolumeclaims"],
"verbs": ["list", "create", "delete"]
}
]
patchRole("emr-containers", args.namespace, emrRoleRules, args.no_prompt)
patchRole("emr-containers-role-spark-driver", args.namespace, driverRoleRules, args.no_prompt)
patchRole("emr-containers-role-spark-client", args.namespace, clientRoleRules, args.no_prompt)