def sync_cinder_policies()

in src/olympia/abuse/tasks.py [0:0]


def sync_cinder_policies():
    max_length = CinderPolicy._meta.get_field('name').max_length
    policies_in_use_q = (
        Q(contentdecision__id__gte=0)
        | Q(reviewactionreason__id__gte=0)
        | Q(expose_in_reviewer_tools=True)
    )

    def sync_policies(data, parent_id=None):
        policies_in_cinder = set()
        for policy in data:
            if (labels := [label['name'] for label in policy.get('labels', [])]) and (
                'AMO' not in labels
            ):
                # If the policy is labelled, but not for AMO, skip it
                continue
            policies_in_cinder.add(policy['uuid'])
            actions = [
                action['slug']
                for action in policy.get('enforcement_actions', [])
                if DECISION_ACTIONS.has_api_value(action['slug'])
            ]
            cinder_policy, _ = CinderPolicy.objects.update_or_create(
                uuid=policy['uuid'],
                defaults={
                    'name': policy['name'][:max_length],
                    'text': policy['description'],
                    'parent_id': parent_id,
                    'modified': datetime.now(),
                    'present_in_cinder': True,
                    'enforcement_actions': actions,
                },
            )

            if nested := policy.get('nested_policies'):
                policies_in_cinder.update(sync_policies(nested, cinder_policy.id))
        return policies_in_cinder

    def delete_unused_orphaned_policies(policies_in_cinder):
        qs = CinderPolicy.objects.exclude(uuid__in=policies_in_cinder).exclude(
            policies_in_use_q
        )
        if qs.exists():
            log.info(
                'Deleting orphaned Cinder Policy not in use: %s',
                list(qs.values_list('uuid', flat=True)),
            )
            qs.delete()

    def mark_used_orphaned_policies(policies_in_cinder):
        qs = (
            CinderPolicy.objects.exclude(uuid__in=policies_in_cinder)
            .exclude(present_in_cinder=False)  # No need to mark those again.
            .filter(policies_in_use_q)
        )
        if qs.exists():
            log.info(
                'Marking orphaned Cinder Policy still in use as such: %s',
                list(qs.values_list('uuid', flat=True)),
            )
            qs.update(present_in_cinder=False)

    url = f'{settings.CINDER_SERVER_URL}policies'
    headers = {
        'accept': 'application/json',
        'content-type': 'application/json',
        'authorization': f'Bearer {settings.CINDER_API_TOKEN}',
    }

    response = requests.get(url, headers=headers)
    response.raise_for_status()
    data = response.json()
    policies_in_cinder = sync_policies(data)
    delete_unused_orphaned_policies(policies_in_cinder)
    mark_used_orphaned_policies(policies_in_cinder)