in cronjobs/src/commands/refresh_signature.py [0:0]
def refresh_signature(event, context, **kwargs):
"""Refresh the signatures of each collection."""
server_url = event["server"]
auth = event.get("refresh_signature_auth") or os.getenv("REFRESH_SIGNATURE_AUTH")
max_signature_age = int(
event.get("max_signature_age", os.getenv("MAX_SIGNATURE_AGE", 7))
)
# Look at the collections in the changes endpoint.
bucket = event.get("bucket", "monitor")
collection = event.get("collection", "changes")
client = Client(server_url=server_url, bucket=bucket, collection=collection)
print("Looking at %s: " % client.get_endpoint("collection"))
changes = client.get_records()
# Look at the signer configuration on the server.
server_info = client.server_info()
errors = []
for change in changes:
# 0. Figure out which was the source collection of this signed collection.
source = get_signed_source(server_info, change)
if source is None:
# Skip if change is not on a destination collection (eg. review collection)
continue
client = Client(
server_url=server_url,
bucket=source["bucket"],
collection=source["collection"],
auth=auth,
)
try:
print("Looking at %s:" % client.get_endpoint("collection"), end=" ")
# 1. Grab collection information
collection_metadata = client.get_collection()["data"]
last_modified = collection_metadata["last_modified"]
status = collection_metadata.get("status")
last_signature_date = collection_metadata.get("last_signature_date")
# Skip signature refresh if the collection was signed recently.
if last_signature_date:
last_signature_dt = datetime.fromisoformat(last_signature_date)
last_signature_age = (utcnow() - last_signature_dt).days
if last_signature_age < max_signature_age:
print("SKIP (only %s days old)" % last_signature_age)
continue
# 2. Refresh!
print("Refresh signature: ", end="")
new_metadata = client.patch_collection(data={"status": "to-resign"})
last_modified = new_metadata["data"]["last_modified"]
# 3. Display the status of the collection
last_modified_dt = timestamp_to_date(last_modified)
print("status=", status, "at", last_modified_dt, "(", last_modified, ")")
except KintoException as e:
print(e)
errors.append(e)
if len(errors) > 0:
error_messages = [str(e) for e in errors]
raise RefreshError("\n" + "\n\n".join(error_messages))