in kinto-remote-settings/src/kinto_remote_settings/changes/utils.py [0:0]
def monitored_timestamps(request):
"""
Return the list of collection timestamps based on the specified
lists of resources in settings.
:rtype: list[tuple[str,str,int]]
"""
settings = request.registry.settings
storage = request.registry.storage
included_resources_uri = aslist(settings.get("changes.resources", ""))
excluded_collections_uri = aslist(settings.get("changes.excluded_collections", ""))
# In Kinto, the only parents of 'record' resources are collections. Therefore
# all `parent_id` values are going to be collection URIs.
all_resources_timestamps = storage.all_resources_timestamps("record")
results = []
for parent_id, timestamp in all_resources_timestamps.items():
matches_included = any(
parent_id.startswith(uri if "/collections/" in uri else f"{uri}/")
for uri in included_resources_uri
)
if not matches_included:
continue
matches_excluded = any(
parent_id.startswith(uri if "/collections/" in uri else f"{uri}/")
for uri in excluded_collections_uri
)
if matches_excluded:
continue
resource_name, matchdict = core_utils.view_lookup_registry(
request.registry, parent_id
)
assert resource_name == "collection", (
f"Record object parent inconsistency: {resource_name}"
)
bucket_id, collection_id = matchdict["bucket_id"], matchdict["id"]
results.append((bucket_id, collection_id, timestamp))
return results