bool agtype_deep_contains()

in src/backend/utils/adt/agtype_util.c [1236:1495]


bool agtype_deep_contains(agtype_iterator **val,
                          agtype_iterator **m_contained,
                          bool skip_nested)
{
    agtype_value vval;
    agtype_value vcontained;
    agtype_iterator_token rval;
    agtype_iterator_token rcont;

    /*
     * Guard against stack overflow due to overly complex agtype.
     *
     * Functions called here independently take this precaution, but that
     * might not be sufficient since this is also a recursive function.
     */
    check_stack_depth();

    rval = agtype_iterator_next(val, &vval, false);
    rcont = agtype_iterator_next(m_contained, &vcontained, false);

    if (rval != rcont)
    {
        /*
         * The differing return values can immediately be taken as indicating
         * two differing container types at this nesting level, which is
         * sufficient reason to give up entirely (but it should be the case
         * that they're both some container type).
         */
        Assert(rval == WAGT_BEGIN_OBJECT || rval == WAGT_BEGIN_ARRAY);
        Assert(rcont == WAGT_BEGIN_OBJECT || rcont == WAGT_BEGIN_ARRAY);
        return false;
    }
    else if (rcont == WAGT_BEGIN_OBJECT)
    {
        Assert(vval.type == AGTV_OBJECT);
        Assert(vcontained.type == AGTV_OBJECT);

        /*
         * If the lhs has fewer pairs than the rhs, it can't possibly contain
         * the rhs.  (This conclusion is safe only because we de-duplicate
         * keys in all agtype objects; thus there can be no corresponding
         * optimization in the array case.)  The case probably won't arise
         * often, but since it's such a cheap check we may as well make it.
         */
        if (vval.val.object.num_pairs < vcontained.val.object.num_pairs)
            return false;

        /* Work through rhs "is it contained within?" object */
        for (;;)
        {
            agtype_value *lhs_val; /* lhs_val is from pair in lhs object */

            rcont = agtype_iterator_next(m_contained, &vcontained, false);

            /*
             * When we get through caller's rhs "is it contained within?"
             * object without failing to find one of its values, it's
             * contained.
             */
            if (rcont == WAGT_END_OBJECT)
                return true;

            Assert(rcont == WAGT_KEY);

            /* First, find value by key... */
            lhs_val = find_agtype_value_from_container(
                (*val)->container, AGT_FOBJECT, &vcontained);

            if (!lhs_val)
                return false;

            /*
             * ...at this stage it is apparent that there is at least a key
             * match for this rhs pair.
             */
            rcont = agtype_iterator_next(m_contained, &vcontained, true);

            Assert(rcont == WAGT_VALUE);

            /*
             * Compare rhs pair's value with lhs pair's value just found using
             * key
             */
            if (lhs_val->type != vcontained.type)
            {
                return false;
            }
            else if (IS_A_AGTYPE_SCALAR(lhs_val))
            {
                if (!equals_agtype_scalar_value(lhs_val, &vcontained))
                    return false;
            }
            else if (skip_nested)
            {
                Assert(lhs_val->type == AGTV_BINARY);
                Assert(vcontained.type == AGTV_BINARY);

                /* We will just check if the rhs value is equal to lhs */
                if (compare_agtype_containers_orderability(
                                             lhs_val->val.binary.data,
                                             vcontained.val.binary.data) != 0)
                {
                    return false;
                }
            }
            else
            {
                /* Nested container value (object or array) */
                agtype_iterator *nestval;
                agtype_iterator *nest_contained;

                Assert(lhs_val->type == AGTV_BINARY);
                Assert(vcontained.type == AGTV_BINARY);

                nestval = agtype_iterator_init(lhs_val->val.binary.data);
                nest_contained =
                    agtype_iterator_init(vcontained.val.binary.data);

                /*
                 * Match "value" side of rhs datum object's pair recursively.
                 * It's a nested structure.
                 *
                 * Note that nesting still has to "match up" at the right
                 * nesting sub-levels.  However, there need only be zero or
                 * more matching pairs (or elements) at each nesting level
                 * (provided the *rhs* pairs/elements *all* match on each
                 * level), which enables searching nested structures for a
                 * single String or other primitive type sub-datum quite
                 * effectively (provided the user constructed the rhs nested
                 * structure such that we "know where to look").
                 *
                 * In other words, the mapping of container nodes in the rhs
                 * "vcontained" agtype to internal nodes on the lhs is
                 * injective, and parent-child edges on the rhs must be mapped
                 * to parent-child edges on the lhs to satisfy the condition
                 * of containment (plus of course the mapped nodes must be
                 * equal).
                 */
                if (!agtype_deep_contains(&nestval, &nest_contained, false))
                {
                    return false;
                }
            }
        }
    }
    else if (rcont == WAGT_BEGIN_ARRAY)
    {
        agtype_value *lhs_conts = NULL;
        uint32 num_lhs_elems = vval.val.array.num_elems;

        Assert(vval.type == AGTV_ARRAY);
        Assert(vcontained.type == AGTV_ARRAY);

        /*
         * Handle distinction between "raw scalar" pseudo arrays, and real
         * arrays.
         *
         * A raw scalar may contain another raw scalar, and an array may
         * contain a raw scalar, but a raw scalar may not contain an array. We
         * don't do something like this for the object case, since objects can
         * only contain pairs, never raw scalars (a pair is represented by an
         * rhs object argument with a single contained pair).
         */
        if (vval.val.array.raw_scalar && !vcontained.val.array.raw_scalar)
            return false;

        /* Work through rhs "is it contained within?" array */
        for (;;)
        {
            rcont = agtype_iterator_next(m_contained, &vcontained, true);

            /*
             * When we get through caller's rhs "is it contained within?"
             * array without failing to find one of its values, it's
             * contained.
             */
            if (rcont == WAGT_END_ARRAY)
                return true;

            Assert(rcont == WAGT_ELEM);

            if (IS_A_AGTYPE_SCALAR(&vcontained))
            {
                if (!find_agtype_value_from_container((*val)->container,
                                                      AGT_FARRAY, &vcontained))
                    return false;
            }
            else
            {
                uint32 i;

                /*
                 * If this is first container found in rhs array (at this
                 * depth), initialize temp lhs array of containers
                 */
                if (lhs_conts == NULL)
                {
                    uint32 j = 0;

                    /* Make room for all possible values */
                    lhs_conts = palloc(sizeof(agtype_value) * num_lhs_elems);

                    for (i = 0; i < num_lhs_elems; i++)
                    {
                        /* Store all lhs elements in temp array */
                        rcont = agtype_iterator_next(val, &vval, true);
                        Assert(rcont == WAGT_ELEM);

                        if (vval.type == AGTV_BINARY)
                            lhs_conts[j++] = vval;
                    }

                    /* No container elements in temp array, so give up now */
                    if (j == 0)
                        return false;

                    /* We may have only partially filled array */
                    num_lhs_elems = j;
                }

                /* XXX: Nested array containment is O(N^2) */
                for (i = 0; i < num_lhs_elems; i++)
                {
                    /* Nested container value (object or array) */
                    agtype_iterator *nestval;
                    agtype_iterator *nest_contained;
                    bool contains;

                    nestval =
                        agtype_iterator_init(lhs_conts[i].val.binary.data);
                    nest_contained =
                        agtype_iterator_init(vcontained.val.binary.data);

                    contains = agtype_deep_contains(&nestval, &nest_contained, false);

                    if (nestval)
                        pfree_if_not_null(nestval);
                    if (nest_contained)
                        pfree_if_not_null(nest_contained);
                    if (contains)
                        break;
                }

                /*
                 * Report rhs container value is not contained if couldn't
                 * match rhs container to *some* lhs cont
                 */
                if (i == num_lhs_elems)
                    return false;
            }
        }
    }
    else
    {
        ereport(ERROR, (errmsg("invalid agtype container type")));
    }

    ereport(ERROR, (errmsg("unexpectedly fell off end of agtype container")));
    return false;
}