def filter_from_conditions_query_dict()

in services/ui_backend_service/api/utils.py [0:0]


def filter_from_conditions_query_dict(query: MultiDict, allowed_keys: List[str] = []):
    """
    Gathers all custom conditions from request query and returns a filter function
    """
    filters = []

    def _no_op(item):
        return True

    for key, val in query.items():
        if key.startswith("_") and not key.startswith('_tags'):
            continue  # skip internal conditions except _tags

        deconstruct = key.split(":", 1)
        if len(deconstruct) > 1:
            field = deconstruct[0]
            operator = deconstruct[1]
        else:
            field = key
            operator = "eq"

        if allowed_keys is not None and field not in allowed_keys:
            continue  # skip conditions on non-allowed fields

        if operator not in operators_to_filters and field != '_tags':
            continue  # skip conditions with no known operators

        # Tags
        if field == "_tags":
            tags = val.split(",")
            _fils = []
            # support likeany, likeall, any, all. default to all
            if operator == "likeany":
                joiner_fn = filter_or
                op = "re"
            elif operator == "likeall":
                joiner_fn = filter_and
                op = "re"
            elif operator == "any":
                joiner_fn = filter_or
                op = "co"
            else:
                joiner_fn = filter_and
                op = "co"

            def bound(op, term):
                _filter = operators_to_filters[op]
                return lambda item: _filter(item['tags'] + item['system_tags'], term) if 'tags' in item and 'system_tags' in item else False

            for tag in tags:
                # Necessary to wrap value inside quotes as we are
                # checking for containment on a list that has been cast to a string
                _pattern = ".*{}.*".format(tag) if op == "re" else "'{}'"
                _val = _pattern.format(tag)
                _fils.append(bound(op, _val))

            if len(_fils) == 0:
                _fil = _no_op
            elif len(_fils) == 1:
                _fil = _fils[0]
            else:
                _fil = reduce(joiner_fn, _fils)
            filters.append(_fil)

        # Default case
        else:
            vals = val.split(",")

            _val_filters = []
            for val in vals:
                _val_filters.append(bound_filter(operator, val, field))

            # OR with a no_op filter would break, so handle the case of no values separately.
            if len(_val_filters) == 0:
                _fil = _no_op
            elif len(_val_filters) == 1:
                _fil = _val_filters[0]
            else:
                # if multiple values, join filters with filter_or()
                _fil = reduce(filter_or, _val_filters)
            filters.append(_fil)

    _final_filter = reduce(filter_and, filters, _no_op)

    return _final_filter  # return filters reduced with filter_and()