in kinto-remote-settings/src/kinto_remote_settings/signer/utils.py [0:0]
def parse_resources(raw_resources):
resources = OrderedDict()
lines = [line.strip() for line in raw_resources.strip().splitlines()]
for res in lines:
error_msg = (
"Malformed resource: %%s (in %r). See kinto-remote-settings README." % res
)
if "->" not in res and ";" not in res:
raise ConfigurationError(error_msg % "not separated with '->'")
try:
triplet = [
r.strip() for r in res.replace(";", " ").replace("->", " ").split()
]
if len(triplet) == 2:
source_uri, destination_uri = triplet
preview_uri = None
else:
source_uri, preview_uri, destination_uri = triplet
except ValueError:
raise ConfigurationError(error_msg % "should be a pair or a triplet")
try:
source = _get_resource(source_uri)
destination = _get_resource(destination_uri)
preview = _get_resource(preview_uri) if preview_uri else None
except ValueError as e:
raise ConfigurationError(error_msg % e)
# Raise if mix-up of per-bucket/specific collection.
sections = (source, destination) + ((preview,) if preview else tuple())
all_per_bucket = all([x["collection"] is None for x in sections])
all_explicit = all([x["collection"] is not None for x in sections])
if not all_per_bucket and not all_explicit:
raise ConfigurationError(
error_msg % "cannot mix bucket and collection URIs"
)
# Repeated source/preview/destination.
if (
len(set([tuple(s.items()) for s in (source, preview or {}, destination)]))
!= 3
):
raise ConfigurationError(
error_msg % "cannot have same value for source, preview or destination"
)
# Resources info is returned as a mapping by bucket/collection URI.
bid = source["bucket"]
if source["collection"] is None:
# Per bucket.
key = f"/buckets/{bid}"
else:
cid = source["collection"]
# For a specific collection.
key = f"/buckets/{bid}/collections/{cid}"
# We can't have the same source twice.
if key in resources:
raise ConfigurationError(error_msg % "cannot repeat resource")
resources[key] = {"source": source, "destination": destination}
if preview is not None:
resources[key]["preview"] = preview
# Raise if same bid/cid twice/thrice.
# Theoretically we could support it, but since we never said it was possible
# and have no test at all for that, prefer safety.
sources = [tuple(r["source"].items()) for r in resources.values()]
destinations = [tuple(r["destination"].items()) for r in resources.values()]
previews = [
tuple(r["preview"].items()) for r in resources.values() if "preview" in r
]
if len(set(destinations)) != len(destinations):
raise ConfigurationError("Resources setting has repeated destination URI")
if len(set(previews)) != len(previews):
raise ConfigurationError("Resources setting has repeated preview URI")
intersects = (
set(sources).intersection(set(previews))
or set(sources).intersection(set(destinations))
or set(destinations).intersection(set(previews))
)
if intersects:
raise ConfigurationError("cannot repeat URIs across resources")
return resources