def publish_intermediates()

in moz_kinto_publisher/main.py [0:0]


def publish_intermediates(*, args, rw_client):
    if args.enrolled_json:
        # when using a local copy of enrolled.json we don't need to determine
        # the most recent run identifier.
        run_id = "local"
    else:
        run_identifiers = workflow.get_run_identifiers(args.filter_bucket)
        if not run_identifiers:
            log.warning("No run identifiers found")
            return
        run_id = run_identifiers[-1]

    run_id_path = args.download_path / Path(run_id)
    run_id_path.mkdir(parents=True, exist_ok=True)

    if args.enrolled_json:
        intermediates_path = Path(args.enrolled_json)
    else:
        intermediates_path = run_id_path / Path("enrolled.json")

        workflow.download_and_retry_from_google_cloud(
            args.filter_bucket,
            f"{run_id}/enrolled.json",
            intermediates_path,
            timeout=timedelta(minutes=5),
        )

    local_intermediates = load_local_intermediates(
        intermediates_path=intermediates_path
    )
    remote_intermediates, remote_error_records = load_remote_intermediates(
        kinto_client=rw_client
    )

    remote_only = set(remote_intermediates.keys()) - set(local_intermediates.keys())

    to_upload = set(local_intermediates.keys()) - set(remote_intermediates.keys())

    to_update = set()
    for i in set(local_intermediates.keys()) & set(remote_intermediates.keys()):
        if not local_intermediates[i].equals(remote_record=remote_intermediates[i]):
            to_update.add(i)

    remote_expired = set()
    for i in remote_only:
        try:
            if remote_intermediates[i].is_expired(kinto_client=rw_client):
                remote_expired.add(i)
        except Exception as e:
            log.warning(f"Failed to track expiration for {i}: {e}")

    log.info(f"Local intermediates: {len(local_intermediates)}")
    log.info(f"Remote intermediates: {len(remote_intermediates)}")
    log.info(f"- Expired: {len(remote_expired)}")
    log.info(f"- In error: {len(remote_error_records)}")
    log.info(f"To add: {len(to_upload)}")
    log.info(f"To update: {len(to_update)}")
    log.info(f"To delete: {len(remote_only)}")

    if args.noop:
        log.info("Noop flag set, exiting before any intermediate updates")
        return

    # All intermediates must be in the local list
    for unique_id in remote_only:
        record = remote_intermediates[unique_id]
        log.info(f"Removing deleted intermediate {record}")
        try:
            record.delete_from_kinto(rw_client=rw_client)
        except KintoException as ke:
            log.error(f"Couldn't delete record {record}: {ke}")

    # Delete any remote records that had errors
    # (note these "records" are just dictionaries)
    for raw_record in remote_error_records:
        log.info(f"Deleting remote record with error: {raw_record}")
        try:
            rw_client.delete_record(
                collection=settings.KINTO_INTERMEDIATES_COLLECTION,
                id=raw_record["id"],
            )
        except KintoException as ke:
            log.error(f"Couldn't delete record id {raw_record['id']}: {ke}")
        except KeyError:  # raw_record doesn't have "id"
            log.error(f"Couldn't delete record: {raw_record}")

    # New records
    for unique_id in to_upload:
        record = local_intermediates[unique_id]
        log.info(f"Adding new record: {record}")
        try:
            record.add_to_kinto(rw_client=rw_client)
        except KintoException as ke:
            log.error(f"Couldn't add record {record}: {ke}")

    # Updates
    for unique_id in to_update:
        local_int = local_intermediates[unique_id]
        remote_int = remote_intermediates[unique_id]
        log.info(f"Updating record: {remote_int} to {local_int}")
        try:
            local_int.update_kinto(
                rw_client=rw_client,
                remote_record=remote_int,
            )
        except KintoException as ke:
            log.error(
                f"Failed to update local={local_int} remote={remote_int} exception={ke}"
            )

    log.info("Verifying correctness...")
    verified_intermediates, verified_error_records = load_remote_intermediates(
        kinto_client=rw_client
    )
    if len(verified_error_records) > 0:
        raise KintoException(
            f"There are {len(verified_error_records)} broken intermediates. Re-run to fix."
        )

    # Every local intermediate should be in the remote list
    for unique_id, local_int in local_intermediates.items():
        if unique_id not in verified_intermediates:
            raise KintoException(f"Failed to upload {unique_id}")
        if not local_int.equals(remote_record=verified_intermediates[unique_id]):
            raise KintoException(
                "Local/Remote metadata mismatch for uniqueId={}".format(unique_id)
            )

    # Every remote intermediate should be in the local list
    for unique_id in verified_intermediates.keys():
        if unique_id not in local_intermediates:
            raise KintoException(f"Failed to remove {unique_id}")

    rw_client.request_review_of_collection(
        collection=settings.KINTO_INTERMEDIATES_COLLECTION
    )