static void process_update_list()

in src/backend/executor/cypher_set.c [349:600]


static void process_update_list(CustomScanState *node)
{
    cypher_set_custom_scan_state *css = (cypher_set_custom_scan_state *)node;
    ExprContext *econtext = css->css.ss.ps.ps_ExprContext;
    TupleTableSlot *scanTupleSlot = econtext->ecxt_scantuple;
    ListCell *lc;
    EState *estate = css->css.ss.ps.state;
    int *luindex = NULL;
    int lidx = 0;

    /* allocate an array to hold the last update index of each 'entity' */
    luindex = palloc0(sizeof(int) * scanTupleSlot->tts_nvalid);
    /*
     * Iterate through the SET items list and store the loop index of each
     * 'entity' update. As there is only one entry for each entity, this will
     * have the effect of overwriting the previous loop index stored - if this
     * 'entity' is used more than once. This will create an array of the last
     * loop index for the update of that particular 'entity'. This will allow us
     * to correctly update an 'entity' after all other previous updates to that
     * 'entity' have been done.
     */
    foreach (lc, css->set_list->set_items)
    {
        cypher_update_item *update_item = NULL;

        update_item = (cypher_update_item *)lfirst(lc);
        luindex[update_item->entity_position - 1] = lidx;

        /* increment the loop index */
        lidx++;
    }

    /* reset loop index */
    lidx = 0;

    /* iterate through SET set items */
    foreach (lc, css->set_list->set_items)
    {
        agtype_value *altered_properties;
        agtype_value *original_entity_value;
        agtype_value *original_properties;
        agtype_value *id;
        agtype_value *label;
        agtype *original_entity;
        agtype *new_property_value;
        TupleTableSlot *slot;
        ResultRelInfo *resultRelInfo;
        ScanKeyData scan_keys[1];
        TableScanDesc scan_desc;
        bool remove_property;
        char *label_name;
        cypher_update_item *update_item;
        Datum new_entity;
        HeapTuple heap_tuple;
        char *clause_name = css->set_list->clause_name;
        int cid;

        update_item = (cypher_update_item *)lfirst(lc);

        /*
         * If the entity is null, we can skip this update. this will be
         * possible when the OPTIONAL MATCH clause is implemented.
         */
        if (scanTupleSlot->tts_isnull[update_item->entity_position - 1])
        {
            continue;
        }

        if (scanTupleSlot->tts_tupleDescriptor->attrs[update_item->entity_position -1].atttypid != AGTYPEOID)
        {
            ereport(ERROR,
                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                     errmsg("age %s clause can only update agtype",
                            clause_name)));
        }

        original_entity = DATUM_GET_AGTYPE_P(scanTupleSlot->tts_values[update_item->entity_position - 1]);
        original_entity_value = get_ith_agtype_value_from_container(&original_entity->root, 0);

        if (original_entity_value->type != AGTV_VERTEX &&
            original_entity_value->type != AGTV_EDGE)
        {
            ereport(ERROR,
                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                     errmsg("age %s clause can only update vertex and edges",
                            clause_name)));
        }

        /* get the id and label for later */
        id = GET_AGTYPE_VALUE_OBJECT_VALUE(original_entity_value, "id");
        label = GET_AGTYPE_VALUE_OBJECT_VALUE(original_entity_value, "label");

        label_name = pnstrdup(label->val.string.val, label->val.string.len);
        /* get the properties we need to update */
        original_properties = GET_AGTYPE_VALUE_OBJECT_VALUE(original_entity_value,
                                                            "properties");

        /*
         * Determine if the property should be removed. This will be because
         * this is a REMOVE clause or the variable references a variable that is
         * NULL. It will be possible for a variable to be NULL when OPTIONAL
         * MATCH is implemented.
         */
        if (update_item->remove_item)
        {
            remove_property = true;
        }
        else
        {
            remove_property = scanTupleSlot->tts_isnull[update_item->prop_position - 1];
        }

        /*
         * If we need to remove the property, set the value to NULL. Otherwise
         * fetch the evaluated expression from the tuple slot.
         */
        if (remove_property)
        {
            new_property_value = NULL;
        }
        else
        {
            new_property_value = DATUM_GET_AGTYPE_P(scanTupleSlot->tts_values[update_item->prop_position - 1]);
        }

        /* Alter the properties Agtype value. */
        if (update_item->prop_name != NULL &&
            strcmp(update_item->prop_name, "") != 0)
        {
            altered_properties = alter_property_value(original_properties,
                                                      update_item->prop_name,
                                                      new_property_value,
                                                      remove_property);
        }
        else
        {
            altered_properties = alter_properties(
                update_item->is_add ? original_properties : NULL,
                new_property_value);

            /*
             * For SET clause with plus-equal operator, nulls are not removed
             * from the map during transformation because they are required in
             * the executor to alter (merge) properties correctly. Only after
             * that step, they can be removed.
             */
            if (update_item->is_add)
            {
                remove_null_from_agtype_object(altered_properties);
            }
        }

        resultRelInfo = create_entity_result_rel_info(
            estate, css->set_list->graph_name, label_name);

        slot = ExecInitExtraTupleSlot(
            estate, RelationGetDescr(resultRelInfo->ri_RelationDesc),
            &TTSOpsHeapTuple);

        /*
         *  Now that we have the updated properties, create a either a vertex or
         *  edge Datum for the in-memory update, and setup the tupleTableSlot
         *  for the on-disc update.
         */
        if (original_entity_value->type == AGTV_VERTEX)
        {
            new_entity = make_vertex(GRAPHID_GET_DATUM(id->val.int_value),
                                     CStringGetDatum(label_name),
                                     AGTYPE_P_GET_DATUM(agtype_value_to_agtype(altered_properties)));

            slot = populate_vertex_tts(slot, id, altered_properties);
        }
        else if (original_entity_value->type == AGTV_EDGE)
        {
            agtype_value *startid = GET_AGTYPE_VALUE_OBJECT_VALUE(original_entity_value, "start_id");
            agtype_value *endid = GET_AGTYPE_VALUE_OBJECT_VALUE(original_entity_value, "end_id");

            new_entity = make_edge(GRAPHID_GET_DATUM(id->val.int_value),
                                   GRAPHID_GET_DATUM(startid->val.int_value),
                                   GRAPHID_GET_DATUM(endid->val.int_value),
                                   CStringGetDatum(label_name),
                                   AGTYPE_P_GET_DATUM(agtype_value_to_agtype(altered_properties)));

            slot = populate_edge_tts(slot, id, startid, endid,
                                     altered_properties);
        }
        else
        {
            ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                            errmsg("age %s clause can only update vertex and edges",
                                   clause_name)));
        }

        /* place the datum in its tuple table slot position. */
        scanTupleSlot->tts_values[update_item->entity_position - 1] = new_entity;

        /*
         * If the tuple table slot has paths, we need to inspect them to see if
         * the updated entity is contained within them and replace the entity
         * if it is.
         */
        update_all_paths(node,
                         id->val.int_value, DATUM_GET_AGTYPE_P(new_entity));

        /*
         * If the last update index for the entity is equal to the current loop
         * index, then update this tuple.
         */
        cid = estate->es_snapshot->curcid;
        estate->es_snapshot->curcid = GetCurrentCommandId(false);

        if (luindex[update_item->entity_position - 1] == lidx)
        {
            /*
             * Setup the scan key to require the id field on-disc to match the
             * entity's graphid.
             */
            ScanKeyInit(&scan_keys[0], 1, BTEqualStrategyNumber, F_GRAPHIDEQ,
                        GRAPHID_GET_DATUM(id->val.int_value));
            /*
             * Setup the scan description, with the correct snapshot and scan
             * keys.
             */
            scan_desc = table_beginscan(resultRelInfo->ri_RelationDesc,
                                        estate->es_snapshot, 1, scan_keys);
            /* Retrieve the tuple. */
            heap_tuple = heap_getnext(scan_desc, ForwardScanDirection);

            /*
             * If the heap tuple still exists (It wasn't deleted between the
             * match and this SET/REMOVE) update the heap_tuple.
             */
            if (HeapTupleIsValid(heap_tuple))
            {
                heap_tuple = update_entity_tuple(resultRelInfo, slot, estate,
                                                 heap_tuple);
            }
            /* close the ScanDescription */
            table_endscan(scan_desc);
        }

        estate->es_snapshot->curcid = cid;
        /* close relation */
        ExecCloseIndices(resultRelInfo);
        table_close(resultRelInfo->ri_RelationDesc, RowExclusiveLock);

        /* increment loop index */
        lidx++;
    }
    /* free our lookup array */
    pfree_if_not_null(luindex);
}