in moz_kinto_publisher/main.py [0:0]
def publish_intermediates(*, args, rw_client):
if args.enrolled_json:
# when using a local copy of enrolled.json we don't need to determine
# the most recent run identifier.
run_id = "local"
else:
run_identifiers = workflow.get_run_identifiers(args.filter_bucket)
if not run_identifiers:
log.warning("No run identifiers found")
return
run_id = run_identifiers[-1]
run_id_path = args.download_path / Path(run_id)
run_id_path.mkdir(parents=True, exist_ok=True)
if args.enrolled_json:
intermediates_path = Path(args.enrolled_json)
else:
intermediates_path = run_id_path / Path("enrolled.json")
workflow.download_and_retry_from_google_cloud(
args.filter_bucket,
f"{run_id}/enrolled.json",
intermediates_path,
timeout=timedelta(minutes=5),
)
local_intermediates = load_local_intermediates(
intermediates_path=intermediates_path
)
remote_intermediates, remote_error_records = load_remote_intermediates(
kinto_client=rw_client
)
remote_only = set(remote_intermediates.keys()) - set(local_intermediates.keys())
to_upload = set(local_intermediates.keys()) - set(remote_intermediates.keys())
to_update = set()
for i in set(local_intermediates.keys()) & set(remote_intermediates.keys()):
if not local_intermediates[i].equals(remote_record=remote_intermediates[i]):
to_update.add(i)
remote_expired = set()
for i in remote_only:
try:
if remote_intermediates[i].is_expired(kinto_client=rw_client):
remote_expired.add(i)
except Exception as e:
log.warning(f"Failed to track expiration for {i}: {e}")
log.info(f"Local intermediates: {len(local_intermediates)}")
log.info(f"Remote intermediates: {len(remote_intermediates)}")
log.info(f"- Expired: {len(remote_expired)}")
log.info(f"- In error: {len(remote_error_records)}")
log.info(f"To add: {len(to_upload)}")
log.info(f"To update: {len(to_update)}")
log.info(f"To delete: {len(remote_only)}")
if args.noop:
log.info("Noop flag set, exiting before any intermediate updates")
return
# All intermediates must be in the local list
for unique_id in remote_only:
record = remote_intermediates[unique_id]
log.info(f"Removing deleted intermediate {record}")
try:
record.delete_from_kinto(rw_client=rw_client)
except KintoException as ke:
log.error(f"Couldn't delete record {record}: {ke}")
# Delete any remote records that had errors
# (note these "records" are just dictionaries)
for raw_record in remote_error_records:
log.info(f"Deleting remote record with error: {raw_record}")
try:
rw_client.delete_record(
collection=settings.KINTO_INTERMEDIATES_COLLECTION,
id=raw_record["id"],
)
except KintoException as ke:
log.error(f"Couldn't delete record id {raw_record['id']}: {ke}")
except KeyError: # raw_record doesn't have "id"
log.error(f"Couldn't delete record: {raw_record}")
# New records
for unique_id in to_upload:
record = local_intermediates[unique_id]
log.info(f"Adding new record: {record}")
try:
record.add_to_kinto(rw_client=rw_client)
except KintoException as ke:
log.error(f"Couldn't add record {record}: {ke}")
# Updates
for unique_id in to_update:
local_int = local_intermediates[unique_id]
remote_int = remote_intermediates[unique_id]
log.info(f"Updating record: {remote_int} to {local_int}")
try:
local_int.update_kinto(
rw_client=rw_client,
remote_record=remote_int,
)
except KintoException as ke:
log.error(
f"Failed to update local={local_int} remote={remote_int} exception={ke}"
)
log.info("Verifying correctness...")
verified_intermediates, verified_error_records = load_remote_intermediates(
kinto_client=rw_client
)
if len(verified_error_records) > 0:
raise KintoException(
f"There are {len(verified_error_records)} broken intermediates. Re-run to fix."
)
# Every local intermediate should be in the remote list
for unique_id, local_int in local_intermediates.items():
if unique_id not in verified_intermediates:
raise KintoException(f"Failed to upload {unique_id}")
if not local_int.equals(remote_record=verified_intermediates[unique_id]):
raise KintoException(
"Local/Remote metadata mismatch for uniqueId={}".format(unique_id)
)
# Every remote intermediate should be in the local list
for unique_id in verified_intermediates.keys():
if unique_id not in local_intermediates:
raise KintoException(f"Failed to remove {unique_id}")
rw_client.request_review_of_collection(
collection=settings.KINTO_INTERMEDIATES_COLLECTION
)