void vertex_row_cb()

in src/backend/utils/load/ag_load_labels.c [70:163]


void vertex_row_cb(int delim __attribute__((unused)), void *data)
{
    csv_vertex_reader *cr = (csv_vertex_reader*)data;
    batch_insert_state *batch_state = cr->batch_state;
    size_t i, n_fields;
    graphid vertex_id;
    int64 entry_id;
    TupleTableSlot *slot;
    TupleTableSlot *temp_id_slot;

    n_fields = cr->cur_field;

    if (cr->row == 0)
    {
        cr->header_num = cr->cur_field;
        cr->header_row_length = cr->curr_row_length;
        cr->header_len = (size_t* )malloc(sizeof(size_t *) * cr->cur_field);
        cr->header = malloc((sizeof (char*) * cr->cur_field));

        for (i = 0; i<cr->cur_field; i++)
        {
            cr->header_len[i] = cr->fields_len[i];
            cr->header[i] = strndup(cr->fields[i], cr->header_len[i]);
        }
    }
    else
    {
        if (cr->id_field_exists)
        {
            entry_id = strtol(cr->fields[0], NULL, 10);
            if (entry_id > cr->curr_seq_num)
            {
                DirectFunctionCall2(setval_oid,
                                    ObjectIdGetDatum(cr->label_seq_relid),
                                    Int64GetDatum(entry_id));
                cr->curr_seq_num = entry_id;
            }
        }
        else
        {
            entry_id = nextval_internal(cr->label_seq_relid, true);
        }

        vertex_id = make_graphid(cr->label_id, entry_id);

        /* Get the appropriate slot from the batch state */
        slot = batch_state->slots[batch_state->num_tuples];
        temp_id_slot = batch_state->temp_id_slots[batch_state->num_tuples];

        /* Clear the slots contents */
        ExecClearTuple(slot);
        ExecClearTuple(temp_id_slot);

        /* Fill the values in the slot */
        slot->tts_values[0] = GRAPHID_GET_DATUM(vertex_id);
        slot->tts_values[1] = AGTYPE_P_GET_DATUM(
                                create_agtype_from_list(cr->header, cr->fields,
                                                        n_fields, entry_id,
                                                        cr->load_as_agtype));
        slot->tts_isnull[0] = false;
        slot->tts_isnull[1] = false;

        temp_id_slot->tts_values[0] = GRAPHID_GET_DATUM(vertex_id);
        temp_id_slot->tts_isnull[0] = false;

        /* Make the slot as containing virtual tuple */
        ExecStoreVirtualTuple(slot);
        ExecStoreVirtualTuple(temp_id_slot);

        batch_state->num_tuples++;

        if (batch_state->num_tuples >= batch_state->max_tuples)
        {
            /* Insert the batch when it is full (i.e. BATCH_SIZE) */
            insert_vertex_batch(batch_state, cr->label_name, cr->graph_oid,
                                cr->temp_table_relid);
            batch_state->num_tuples = 0;
        }
    }

    for (i = 0; i < n_fields; ++i)
    {
        free(cr->fields[i]);
    }

    if (cr->error)
    {
        ereport(NOTICE,(errmsg("THere is some error")));
    }

    cr->cur_field = 0;
    cr->curr_row_length = 0;
    cr->row += 1;
}