security-policies/bundle/compliance/policy/process/data_adapter.rego (47 lines of code) (raw):
package compliance.policy.process.data_adapter
import future.keywords.if
is_process if {
input.type == "process"
}
process_name := name if {
is_process
name = input.resource.stat.Name
}
process_args_list := args_list if {
is_process
# Gets all the process arguments of the current process
# Expects format as the following: --<key><delimiter><value> for example: --config=a.json
# Notice that the first argument is always the process path
args_list := split(input.resource.command, " --")
}
# Parses a single argument and returns a tuple of the flag and the value
parse_argument(argument) := [flag, value] if {
# We would like to split the argument by the first delimiter
# The dilimiter can be either a space or an equal sign
splitted_argument := regex.split(`\s|\=`, argument)
flag = concat("", ["--", splitted_argument[0]])
# We would like to take the entire string after the first delimiter
value = concat("=", array.slice(splitted_argument, 1, count(splitted_argument) + 1))
}
process_config := config if {
is_process
config := {key: value | value = input.resource.external_data[key]}
}
is_kube_apiserver if {
process_name == "kube-apiserver"
}
is_kube_controller_manager if {
process_name == "kube-controller"
}
is_kube_scheduler if {
process_name == "kube-scheduler"
}
is_etcd if {
process_name == "etcd"
}
is_kubelet if {
process_name == "kubelet"
}
process_args[flag] := value if {
[flag, value] = parse_argument(process_args_list[_])
}