in cronjobs/src/commands/refresh_signature.py [0:0]
def get_signed_source(server_info, change):
# Small helper to identify the source collection from a potential
# signing destination collection, like those mentioned in the changes endpoint
# (eg. blocklists/plugins -> staging/plugins).
signed_resources = server_info["capabilities"]["signer"]["resources"]
for r in signed_resources:
match_destination = r["destination"]["bucket"] == change["bucket"] and (
r["destination"]["collection"] is None
or r["destination"]["collection"] == change["collection"]
)
if match_destination:
return {
"bucket": r["source"]["bucket"],
# Per-bucket configuration.
"collection": r["source"]["collection"] or change["collection"],
}