in src/backend/executor/cypher_merge.c [959:1256]
static Datum merge_vertex(cypher_merge_custom_scan_state *css,
cypher_target_node *node, ListCell *next, List *list,
path_entry **path_array, int path_index,
bool should_insert)
{
bool isNull;
Datum id;
EState *estate = css->css.ss.ps.state;
ExprContext *econtext = css->css.ss.ps.ps_ExprContext;
ResultRelInfo *resultRelInfo = node->resultRelInfo;
TupleTableSlot *elemTupleSlot = node->elemTupleSlot;
TupleTableSlot *scanTupleSlot = econtext->ecxt_scantuple;
Assert(node->type == LABEL_KIND_VERTEX);
/*
* Vertices in a path might already exists. If they do get the id
* to pass to the edges before and after it. Otherwise, insert the
* new vertex into it's table and then pass the id along.
*/
if (CYPHER_TARGET_NODE_INSERT_ENTITY(node->flags))
{
ResultRelInfo **old_estate_es_result_relations = NULL;
Datum prop;
/*
* Set estate's result relation to the vertex's result
* relation.
*
* Note: This obliterates what was their previously
*/
/* save the old result relation info */
old_estate_es_result_relations = estate->es_result_relations;
estate->es_result_relations = &resultRelInfo;
ExecClearTuple(elemTupleSlot);
/* if we not are going to insert, we need our structure pointers */
if (should_insert == false &&
(path_array == NULL || path_array[path_index] == NULL))
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid input parameter combination")));
}
/*
* If we shouldn't insert the vertex, we need to retrieve it from the
* storage structure.
*/
if (should_insert == false &&
path_array != NULL &&
path_array[path_index] != NULL)
{
id = path_array[path_index]->id;
isNull = path_array[path_index]->id_isNull;
}
/*
* Otherwise, we need to retrieve the vertex normally and store its
* unique values if the storage structure exists.
*/
else if (should_insert == true)
{
/* get the next graphid for this vertex */
id = ExecEvalExpr(node->id_expr_state, econtext, &isNull);
if (path_array != NULL && path_array[path_index] != NULL)
{
/* store it */
path_array[path_index]->id = id;
path_array[path_index]->id_isNull = isNull;
}
}
else
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid input parameter combination")));
}
/* put the id values into the tuple slot */
elemTupleSlot->tts_values[vertex_tuple_id] = id;
elemTupleSlot->tts_isnull[vertex_tuple_id] = isNull;
/*
* Retrieve the properties and isNull values if the storage structure
* exists.
*/
if (path_array != NULL && path_array[path_index] != NULL)
{
prop = path_array[path_index]->prop;
isNull = path_array[path_index]->prop_isNull;
}
/* otherwise, get them normally */
else
{
/* get the properties for this vertex */
prop = ExecEvalExpr(node->prop_expr_state, econtext, &isNull);
}
/* put the prop values into the tuple slot */
elemTupleSlot->tts_values[vertex_tuple_properties] = prop;
elemTupleSlot->tts_isnull[vertex_tuple_properties] = isNull;
/*
* Insert the new vertex.
*
* Depending on the currentCommandId, we need to do this one of two
* different ways -
*
* 1) If they are equal, the currentCommandId hasn't been used for an
* update, or it hasn't been incremented after being used. In either
* case, we need to use the current one and then increment it so that
* the following commands will have visibility of this update. Note,
* it isn't our job to update the currentCommandId first and then do
* this check.
*
* 2) If they are not equal, the currentCommandId has been used and/or
* updated. In this case, we can't use it. Otherwise our update won't
* be visible to anything that follows, until the currentCommandId is
* updated again. Remember, visibility is, greater than but not equal
* to, the currentCommandID used for the update. So, in this case we
* need to use the original currentCommandId when begin_cypher_merge
* was initiated as everything under this instance of merge needs to
* be based off of that initial currentCommandId. This allows the
* following command to see the updates generated by this instance of
* merge.
*/
if (should_insert &&
css->base_currentCommandId == GetCurrentCommandId(false))
{
insert_entity_tuple(resultRelInfo, elemTupleSlot, estate);
/*
* Increment the currentCommandId since we processed an update. We
* don't want to do this outside of this block because we don't want
* to inadvertently or unnecessarily update the commandCounterId of
* another command.
*/
CommandCounterIncrement();
}
else if (should_insert)
{
insert_entity_tuple_cid(resultRelInfo, elemTupleSlot, estate,
css->base_currentCommandId);
}
/* restore the old result relation info */
estate->es_result_relations = old_estate_es_result_relations;
/*
* When the vertex is used by clauses higher in the execution tree
* we need to create a vertex datum. When the vertex is a variable,
* add to the scantuple slot. When the vertex is part of a path
* variable, add to the list.
*/
if (CYPHER_TARGET_NODE_OUTPUT(node->flags))
{
Datum result;
/* make the vertex agtype */
result = make_vertex(id, CStringGetDatum(node->label_name), prop);
/* append to the path list */
if (CYPHER_TARGET_NODE_IN_PATH(node->flags))
{
css->path_values = lappend(css->path_values,
DatumGetPointer(result));
}
/*
* Put the vertex in the correct spot in the scantuple, so parent
* execution nodes can reference the newly created variable.
*/
if (CYPHER_TARGET_NODE_IS_VARIABLE(node->flags))
{
bool debug_flag = false;
int tuple_position = node->tuple_position - 1;
/*
* We need to make sure that the tuple_position is within the
* boundaries of the tuple's number of attributes. Otherwise, it
* will corrupt memory. The cases where it doesn't fall within
* are usually due to a variable that is specified but there
* isn't a RETURN clause. In these cases we just don't bother to
* store the value.
*/
if (!debug_flag &&
(tuple_position < scanTupleSlot->tts_tupleDescriptor->natts ||
scanTupleSlot->tts_tupleDescriptor->natts != 1))
{
/* store the result */
scanTupleSlot->tts_values[tuple_position] = result;
scanTupleSlot->tts_isnull[tuple_position] = false;
}
}
}
}
/*
* If we have the storage structure pointers, we have already retrieved the
* ID from the datum in the scan tuple, so just retrieve it from the
* structure.
*/
else if (path_array != NULL && path_array[path_index] != NULL)
{
/* retrieve the id of the vertex */
id = path_array[path_index]->id;
/*
* Add the Datum to the list of entities for creating the path variable
*/
if (CYPHER_TARGET_NODE_IN_PATH(node->flags))
{
Datum vertex = scanTupleSlot->tts_values[node->tuple_position - 1];
css->path_values = lappend(css->path_values,
DatumGetPointer(vertex));
}
}
else
{
agtype *a = NULL;
Datum d;
agtype_value *v = NULL;
agtype_value *id_value = NULL;
/* check that the variable isn't NULL */
if (scanTupleSlot->tts_isnull[node->tuple_position - 1])
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Existing variable %s cannot be NULL in MERGE clause",
node->variable_name)));
}
/* get the vertex agtype in the scanTupleSlot */
d = scanTupleSlot->tts_values[node->tuple_position - 1];
a = DATUM_GET_AGTYPE_P(d);
/* Convert to an agtype value */
v = get_ith_agtype_value_from_container(&a->root, 0);
if (v->type != AGTV_VERTEX)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("agtype must resolve to a vertex")));
}
/* extract the id agtype field */
id_value = GET_AGTYPE_VALUE_OBJECT_VALUE(v, "id");
/* extract the graphid and cast to a Datum */
id = GRAPHID_GET_DATUM(id_value->val.int_value);
/*
* Its possible the variable has already been deleted. There are two
* ways this can happen. One is the query explicitly deleted the
* variable, the is_deleted flag will catch that. However, it is
* possible the user deleted the vertex using another variable name. We
* need to scan the table to find the vertex's current status relative
* to this CREATE clause. If the variable was initially created in this
* clause, we can skip this check, because the transaction system
* guarantees that nothing can happen to that tuple, as far as we are
* concerned with at this time.
*/
if (!SAFE_TO_SKIP_EXISTENCE_CHECK(node->flags))
{
if (!entity_exists(estate, css->graph_oid, DATUM_GET_GRAPHID(id)))
{
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("vertex assigned to variable %s was deleted",
node->variable_name)));
}
}
/*
* Add the Datum to the list of entities for creating the path variable
*/
if (CYPHER_TARGET_NODE_IN_PATH(node->flags))
{
Datum vertex = scanTupleSlot->tts_values[node->tuple_position - 1];
css->path_values = lappend(css->path_values,
DatumGetPointer(vertex));
}
}
/* If the path continues, create the next edge, passing the vertex's id. */
if (next != NULL)
{
merge_edge(css, lfirst(next), id, lnext(list, next), list,
path_array, path_index+1, should_insert);
}
return id;
}