static Datum merge_vertex()

in src/backend/executor/cypher_merge.c [959:1256]


static Datum merge_vertex(cypher_merge_custom_scan_state *css,
                          cypher_target_node *node, ListCell *next, List *list,
                          path_entry **path_array, int path_index,
                          bool should_insert)
{
    bool isNull;
    Datum id;
    EState *estate = css->css.ss.ps.state;
    ExprContext *econtext = css->css.ss.ps.ps_ExprContext;
    ResultRelInfo *resultRelInfo = node->resultRelInfo;
    TupleTableSlot *elemTupleSlot = node->elemTupleSlot;
    TupleTableSlot *scanTupleSlot = econtext->ecxt_scantuple;

    Assert(node->type == LABEL_KIND_VERTEX);

    /*
     * Vertices in a path might already exists. If they do get the id
     * to pass to the edges before and after it. Otherwise, insert the
     * new vertex into it's table and then pass the id along.
     */
    if (CYPHER_TARGET_NODE_INSERT_ENTITY(node->flags))
    {
        ResultRelInfo **old_estate_es_result_relations = NULL;
        Datum prop;

        /*
         * Set estate's result relation to the vertex's result
         * relation.
         *
         * Note: This obliterates what was their previously
         */

        /* save the old result relation info */
        old_estate_es_result_relations = estate->es_result_relations;

        estate->es_result_relations = &resultRelInfo;

        ExecClearTuple(elemTupleSlot);

        /* if we not are going to insert, we need our structure pointers */
        if (should_insert == false &&
            (path_array == NULL || path_array[path_index] == NULL))
        {
            ereport(ERROR,
                    (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                             errmsg("invalid input parameter combination")));
        }

        /*
         * If we shouldn't insert the vertex, we need to retrieve it from the
         * storage structure.
         */
        if (should_insert == false &&
            path_array != NULL &&
            path_array[path_index] != NULL)
        {
            id = path_array[path_index]->id;
            isNull = path_array[path_index]->id_isNull;
        }
        /*
         * Otherwise, we need to retrieve the vertex normally and store its
         * unique values if the storage structure exists.
         */
        else if (should_insert == true)
        {
            /* get the next graphid for this vertex */
            id = ExecEvalExpr(node->id_expr_state, econtext, &isNull);

            if (path_array != NULL && path_array[path_index] != NULL)
            {
                /* store it */
                path_array[path_index]->id = id;
                path_array[path_index]->id_isNull = isNull;
            }
        }
        else
        {
            ereport(ERROR,
                    (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                             errmsg("invalid input parameter combination")));
        }

        /* put the id values into the tuple slot */
        elemTupleSlot->tts_values[vertex_tuple_id] = id;
        elemTupleSlot->tts_isnull[vertex_tuple_id] = isNull;

        /*
         * Retrieve the properties and isNull values if the storage structure
         * exists.
         */
        if (path_array != NULL && path_array[path_index] != NULL)
        {
            prop = path_array[path_index]->prop;
            isNull = path_array[path_index]->prop_isNull;
        }
        /* otherwise, get them normally */
        else
        {
            /* get the properties for this vertex */
            prop = ExecEvalExpr(node->prop_expr_state, econtext, &isNull);
        }

        /* put the prop values into the tuple slot */
        elemTupleSlot->tts_values[vertex_tuple_properties] = prop;
        elemTupleSlot->tts_isnull[vertex_tuple_properties] = isNull;

        /*
         * Insert the new vertex.
         *
         * Depending on the currentCommandId, we need to do this one of two
         * different ways -
         *
         * 1) If they are equal, the currentCommandId hasn't been used for an
         *    update, or it hasn't been incremented after being used. In either
         *    case, we need to use the current one and then increment it so that
         *    the following commands will have visibility of this update. Note,
         *    it isn't our job to update the currentCommandId first and then do
         *    this check.
         *
         * 2) If they are not equal, the currentCommandId has been used and/or
         *    updated. In this case, we can't use it. Otherwise our update won't
         *    be visible to anything that follows, until the currentCommandId is
         *    updated again. Remember, visibility is, greater than but not equal
         *    to, the currentCommandID used for the update. So, in this case we
         *    need to use the original currentCommandId when begin_cypher_merge
         *    was initiated as everything under this instance of merge needs to
         *    be based off of that initial currentCommandId. This allows the
         *    following command to see the updates generated by this instance of
         *    merge.
         */
        if (should_insert &&
            css->base_currentCommandId == GetCurrentCommandId(false))
        {
            insert_entity_tuple(resultRelInfo, elemTupleSlot, estate);

            /*
             * Increment the currentCommandId since we processed an update. We
             * don't want to do this outside of this block because we don't want
             * to inadvertently or unnecessarily update the commandCounterId of
             * another command.
             */
            CommandCounterIncrement();
        }
        else if (should_insert)
        {
            insert_entity_tuple_cid(resultRelInfo, elemTupleSlot, estate,
                                    css->base_currentCommandId);
        }

        /* restore the old result relation info */
        estate->es_result_relations = old_estate_es_result_relations;

        /*
         * When the vertex is used by clauses higher in the execution tree
         * we need to create a vertex datum. When the vertex is a variable,
         * add to the scantuple slot. When the vertex is part of a path
         * variable, add to the list.
         */
        if (CYPHER_TARGET_NODE_OUTPUT(node->flags))
        {
            Datum result;

            /* make the vertex agtype */
            result = make_vertex(id, CStringGetDatum(node->label_name), prop);

            /* append to the path list */
            if (CYPHER_TARGET_NODE_IN_PATH(node->flags))
            {
                css->path_values = lappend(css->path_values,
                                           DatumGetPointer(result));
            }

            /*
             * Put the vertex in the correct spot in the scantuple, so parent
             * execution nodes can reference the newly created variable.
             */
            if (CYPHER_TARGET_NODE_IS_VARIABLE(node->flags))
            {
                bool debug_flag = false;
                int tuple_position = node->tuple_position - 1;

                /*
                 * We need to make sure that the tuple_position is within the
                 * boundaries of the tuple's number of attributes. Otherwise, it
                 * will corrupt memory. The cases where it doesn't fall within
                 * are usually due to a variable that is specified but there
                 * isn't a RETURN clause. In these cases we just don't bother to
                 * store the value.
                 */
                if (!debug_flag &&
                    (tuple_position < scanTupleSlot->tts_tupleDescriptor->natts ||
                     scanTupleSlot->tts_tupleDescriptor->natts != 1))
                {
                    /* store the result */
                    scanTupleSlot->tts_values[tuple_position] = result;
                    scanTupleSlot->tts_isnull[tuple_position] = false;
                }
            }
        }
    }
    /*
     * If we have the storage structure pointers, we have already retrieved the
     * ID from the datum in the scan tuple, so just retrieve it from the
     * structure.
     */
    else if (path_array != NULL && path_array[path_index] != NULL)
    {
        /* retrieve the id of the vertex */
        id = path_array[path_index]->id;

        /*
         * Add the Datum to the list of entities for creating the path variable
         */
        if (CYPHER_TARGET_NODE_IN_PATH(node->flags))
        {
            Datum vertex = scanTupleSlot->tts_values[node->tuple_position - 1];
            css->path_values = lappend(css->path_values,
                                       DatumGetPointer(vertex));
        }
    }
    else
    {
        agtype *a = NULL;
        Datum d;
        agtype_value *v = NULL;
        agtype_value *id_value = NULL;

        /* check that the variable isn't NULL */
        if (scanTupleSlot->tts_isnull[node->tuple_position - 1])
        {
            ereport(ERROR,
                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                     errmsg("Existing variable %s cannot be NULL in MERGE clause",
                            node->variable_name)));
        }

        /* get the vertex agtype in the scanTupleSlot */
        d = scanTupleSlot->tts_values[node->tuple_position - 1];
        a = DATUM_GET_AGTYPE_P(d);

        /* Convert to an agtype value */
        v = get_ith_agtype_value_from_container(&a->root, 0);

        if (v->type != AGTV_VERTEX)
        {
            ereport(ERROR,
                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                     errmsg("agtype must resolve to a vertex")));
        }

        /* extract the id agtype field */
        id_value = GET_AGTYPE_VALUE_OBJECT_VALUE(v, "id");

        /* extract the graphid and cast to a Datum */
        id = GRAPHID_GET_DATUM(id_value->val.int_value);

        /*
         * Its possible the variable has already been deleted. There are two
         * ways this can happen. One is the query explicitly deleted the
         * variable, the is_deleted flag will catch that. However, it is
         * possible the user deleted the vertex using another variable name. We
         * need to scan the table to find the vertex's current status relative
         * to this CREATE clause. If the variable was initially created in this
         * clause, we can skip this check, because the transaction system
         * guarantees that nothing can happen to that tuple, as far as we are
         * concerned with at this time.
         */
        if (!SAFE_TO_SKIP_EXISTENCE_CHECK(node->flags))
        {
            if (!entity_exists(estate, css->graph_oid, DATUM_GET_GRAPHID(id)))
            {
                ereport(ERROR,
                    (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                     errmsg("vertex assigned to variable %s was deleted",
                            node->variable_name)));
            }
        }

        /*
         * Add the Datum to the list of entities for creating the path variable
         */
        if (CYPHER_TARGET_NODE_IN_PATH(node->flags))
        {
            Datum vertex = scanTupleSlot->tts_values[node->tuple_position - 1];
            css->path_values = lappend(css->path_values,
                                       DatumGetPointer(vertex));
        }
    }

    /* If the path continues, create the next edge, passing the vertex's id. */
    if (next != NULL)
    {
        merge_edge(css, lfirst(next), id, lnext(list, next), list,
                   path_array, path_index+1, should_insert);
    }

    return id;
}