def get_signed_source()

in cronjobs/src/commands/refresh_signature.py [0:0]


def get_signed_source(server_info, change):
    # Small helper to identify the source collection from a potential
    # signing destination collection, like those mentioned in the changes endpoint
    # (eg. blocklists/plugins -> staging/plugins).
    signed_resources = server_info["capabilities"]["signer"]["resources"]
    for r in signed_resources:
        match_destination = r["destination"]["bucket"] == change["bucket"] and (
            r["destination"]["collection"] is None
            or r["destination"]["collection"] == change["collection"]
        )
        if match_destination:
            return {
                "bucket": r["source"]["bucket"],
                # Per-bucket configuration.
                "collection": r["source"]["collection"] or change["collection"],
            }