in kinto-remote-settings/src/kinto_remote_settings/signer/__init__.py [0:0]
def load_signed_resources_configuration(config):
settings = config.get_settings()
# Load settings from KINTO_SIGNER_* environment variables.
for setting, default_value in DEFAULT_SETTINGS.items():
settings[f"signer.{setting}"] = utils.get_first_matching_setting(
setting_name=setting,
settings=settings,
prefixes=["signer."],
default=default_value,
)
# Expand glob settings into concrete settings using existing objects in DB.
# (eg. "signer.main-workspace.magic-(\w+)" -> "signer.main-workspace.magic-word")
expanded_settings = utils.expand_collections_glob_settings(
config.registry.storage, settings
)
config.add_settings(expanded_settings)
settings.update(**expanded_settings)
# Check source and destination resources are configured.
resources = utils.parse_resources(settings["signer.resources"])
# Expand the resources with the ones that come from per-bucket resources
# and have specific settings.
# For example, consider the case where resource is ``/buckets/dev -> /buckets/prod``
# and there is a setting ``signer.dev.recipes.signer_backend = foo``
output_resources = resources.copy()
for resource in resources.values():
# If collection is not None, there is nothing to expand :)
if resource["source"]["collection"] is not None:
continue
bid = resource["source"]["bucket"]
# Match setting names like signer.stage.specific.autograph.hawk_id
matches = [
(v, re.search(rf"signer\.{bid}\.([^\.]+)\.(.+)", k))
for k, v in settings.items()
]
found = [(v, m.group(1), m.group(2)) for (v, m) in matches if m]
# Expand the list of resources with the ones that contain collection
# specific settings.
for setting_value, cid, setting_name in found:
signer_key = f"/buckets/{bid}/collections/{cid}"
if signer_key not in output_resources:
specific = copy.deepcopy(resource)
specific["source"]["collection"] = cid
specific["destination"]["collection"] = cid
if "preview" in specific:
specific["preview"]["collection"] = cid
output_resources[signer_key] = specific
output_resources[signer_key][setting_name] = setting_value
resources = output_resources
# Determine which are the settings that apply to all buckets/collections.
global_settings = {
"editors_group": "{collection_id}-editors",
"reviewers_group": "{collection_id}-reviewers",
"to_review_enabled": asbool(settings["signer.to_review_enabled"]),
}
# For each resource that is configured, we determine what signer is
# configured and what are the review settings.
# Note: the `resource` values are mutated in place.
config.registry.signers = {}
for signer_key, resource in resources.items():
bid = resource["source"]["bucket"]
server_wide = "signer."
bucket_wide = f"signer.{bid}."
prefixes = [bucket_wide, server_wide]
per_bucket_config = resource["source"]["collection"] is None
if not per_bucket_config:
cid = resource["source"]["collection"]
collection_wide = f"signer.{bid}.{cid}."
deprecated = f"signer.{bid}_{cid}."
prefixes = [collection_wide, deprecated, *prefixes]
# Instantiates the signers associated to this resource.
dotted_location = utils.get_first_matching_setting(
"signer_backend",
settings,
prefixes,
default=DEFAULT_SETTINGS["signer_backend"],
)
signer_module = config.maybe_dotted(dotted_location)
backend = signer_module.load_from_settings(settings, prefixes=prefixes)
config.registry.signers[signer_key] = backend
# Check if review enabled/disabled for this particular resources.
resource_to_review_enabled = asbool(
utils.get_first_matching_setting(
"to_review_enabled",
settings,
prefixes,
default=global_settings["to_review_enabled"],
)
)
# Keep the `to_review_enabled` field in the resource object
# only if it was overriden. In other words, this will be exposed in
# the capabilities if the resource's review setting is different from
# the global server setting.
if resource_to_review_enabled != global_settings["to_review_enabled"]:
resource["to_review_enabled"] = resource_to_review_enabled
else:
resource.pop("to_review_enabled", None)
# Expose the capabilities in the root endpoint.
exposed_resources = [
core_utils.dict_subset(
r, ["source", "destination", "preview", "to_review_enabled"]
)
for r in resources.values()
if "(" not in str(r["source"]) # do not show patterns
]
message = "Digital signatures for integrity and authenticity of records."
docs = "https://github.com/mozilla/remote-settings/tree/main/kinto-remote-settings"
config.registry.api_capabilities.pop("signer", None)
config.add_api_capability(
"signer",
message,
docs,
version=__version__,
resources=exposed_resources,
# Backward compatibility with < v26
group_check_enabled=True,
**global_settings,
)
return resources