in python/qpid_dispatch_internal/policy/policy_local.py [0:0]
def compile_access_ruleset(self, name, policy_in, policy_out, warnings, errors):
"""
Compile a vhost schema from processed json format to local internal format.
@param[in] name vhost name
@param[in] policy_in raw policy to be validated
@param[out] policy_out validated Internal format
@param[out] warnings nonfatal irregularities observed
@param[out] errors descriptions of failure
@return - policy is usable. If True then warnings[] may contain useful
information about fields that are ignored. If False then
warnings[] may contain info and errors[0] will hold the
description of why the policy was rejected.
"""
cerror = []
# rulesets may not come through standard config so make nice defaults
policy_out[PolicyKeys.KW_MAXCONN] = 65535
policy_out[PolicyKeys.KW_MAXCONNPERHOST] = 65535
policy_out[PolicyKeys.KW_MAXCONNPERUSER] = 65535
policy_out[PolicyKeys.KW_CONNECTION_ALLOW_DEFAULT] = False
policy_out[PolicyKeys.KW_GROUPS] = {}
policy_out[PolicyKeys.KW_MAX_MESSAGE_SIZE] = None
policy_out[PolicyKeys.KW_VHOST_ALIASES] = []
# validate the options
for key, val in policy_in.items():
if key not in self.allowed_ruleset_options:
warnings.append("Policy vhost '%s' option '%s' is ignored." %
(name, key))
if key in [PolicyKeys.KW_MAXCONN,
PolicyKeys.KW_MAXCONNPERHOST,
PolicyKeys.KW_MAXCONNPERUSER
]:
if not self.validateNumber(val, 0, 65535, cerror):
msg = ("Policy vhost '%s' option '%s' has error '%s'." %
(name, key, cerror[0]))
errors.append(msg)
return False
policy_out[key] = val
elif key in [PolicyKeys.KW_MAX_MESSAGE_SIZE
]:
if not self.validateNumber(val, 0, 0, cerror):
msg = ("Policy vhost '%s' option '%s' has error '%s'." %
(name, key, cerror[0]))
errors.append(msg)
return False
policy_out[key] = val
elif key in [PolicyKeys.KW_CONNECTION_ALLOW_DEFAULT]:
if not isinstance(val, bool):
errors.append("Policy vhost '%s' option '%s' must be of type 'bool' but is '%s'" %
(name, key, type(val)))
return False
policy_out[key] = val
elif key in [PolicyKeys.KW_VHOST_ALIASES]:
# vhost aliases is a CSV string. convert to a list
val0 = [x.strip(' ') for x in val.split(PolicyKeys.KC_CONFIG_LIST_SEP)]
# Reject aliases that duplicate the vhost itself or other aliases
val = []
for vtest in val0:
if vtest == name:
errors.append("Policy vhost '%s' option '%s' value '%s' duplicates vhost name" %
(name, key, vtest))
return False
if vtest in val:
errors.append("Policy vhost '%s' option '%s' value '%s' is duplicated" %
(name, key, vtest))
return False
val.append(vtest)
policy_out[key] = val
elif key in [PolicyKeys.KW_GROUPS]:
if not isinstance(val, dict):
errors.append("Policy vhost '%s' option '%s' must be of type 'dict' but is '%s'" %
(name, key, type(val)))
return False
for skey, sval in val.items():
newsettings = {}
if not self.compile_app_settings(name, skey, sval, newsettings, warnings, errors):
return False
policy_out[key][skey] = {}
policy_out[key][skey].update(newsettings)
# Verify that each user is in only one group.
# Create user-to-group map for looking up user's group
policy_out[PolicyKeys.RULESET_U2G_MAP] = {}
if PolicyKeys.KW_GROUPS in policy_out:
for group, groupsettings in policy_out[PolicyKeys.KW_GROUPS].items():
if PolicyKeys.KW_USERS in groupsettings:
users = [x.strip(' ') for x in groupsettings[PolicyKeys.KW_USERS].split(PolicyKeys.KC_CONFIG_LIST_SEP)]
for user in users:
if user in policy_out[PolicyKeys.RULESET_U2G_MAP]:
errors.append("Policy vhost '%s' user '%s' is in multiple user groups '%s' and '%s'" %
(name, user, policy_out[PolicyKeys.RULESET_U2G_MAP][user], group))
return False
else:
policy_out[PolicyKeys.RULESET_U2G_MAP][user] = group
else:
warnings.append("Policy vhost '%s' user group '%s' has no defined users. This policy has no effect" % (name, group))
# Default connections require a default settings
if policy_out[PolicyKeys.KW_CONNECTION_ALLOW_DEFAULT]:
if PolicyKeys.KW_DEFAULT_SETTINGS not in policy_out[PolicyKeys.KW_GROUPS]:
errors.append("Policy vhost '%s' allows connections by default but default settings are not defined" %
(name))
return False
return True