void edge_row_cb()

in src/backend/utils/load/ag_load_edges.c [61:152]


void edge_row_cb(int delim __attribute__((unused)), void *data)
{

    csv_edge_reader *cr = (csv_edge_reader*)data;
    batch_insert_state *batch_state = cr->batch_state;

    size_t i, n_fields;
    int64 start_id_int;
    graphid start_vertex_graph_id;
    int start_vertex_type_id;

    int64 end_id_int;
    graphid end_vertex_graph_id;
    int end_vertex_type_id;

    graphid edge_id;
    int64 entry_id;
    TupleTableSlot *slot;

    n_fields = cr->cur_field;

    if (cr->row == 0)
    {
        cr->header_num = cr->cur_field;
        cr->header_row_length = cr->curr_row_length;
        cr->header_len = (size_t* )malloc(sizeof(size_t *) * cr->cur_field);
        cr->header = malloc((sizeof (char*) * cr->cur_field));

        for (i = 0; i<cr->cur_field; i++)
        {
            cr->header_len[i] = cr->fields_len[i];
            cr->header[i] = strndup(cr->fields[i], cr->header_len[i]);
        }
    }
    else
    {
        entry_id = nextval_internal(cr->label_seq_relid, true);
        edge_id = make_graphid(cr->label_id, entry_id);

        start_id_int = strtol(cr->fields[0], NULL, 10);
        start_vertex_type_id = get_label_id(cr->fields[1], cr->graph_oid);
        end_id_int = strtol(cr->fields[2], NULL, 10);
        end_vertex_type_id = get_label_id(cr->fields[3], cr->graph_oid);

        start_vertex_graph_id = make_graphid(start_vertex_type_id, start_id_int);
        end_vertex_graph_id = make_graphid(end_vertex_type_id, end_id_int);

        /* Get the appropriate slot from the batch state */
        slot = batch_state->slots[batch_state->num_tuples];

        /* Clear the slots contents */
        ExecClearTuple(slot);

        /* Fill the values in the slot */
        slot->tts_values[0] = GRAPHID_GET_DATUM(edge_id);
        slot->tts_values[1] = GRAPHID_GET_DATUM(start_vertex_graph_id);
        slot->tts_values[2] = GRAPHID_GET_DATUM(end_vertex_graph_id);
        slot->tts_values[3] = AGTYPE_P_GET_DATUM(
                                create_agtype_from_list_i(
                                    cr->header, cr->fields,
                                    n_fields, 4, cr->load_as_agtype));
        slot->tts_isnull[0] = false;
        slot->tts_isnull[1] = false;
        slot->tts_isnull[2] = false;
        slot->tts_isnull[3] = false;

        /* Make the slot as containing virtual tuple */
        ExecStoreVirtualTuple(slot);
        batch_state->num_tuples++;

        if (batch_state->num_tuples >= batch_state->max_tuples)
        {
            /* Insert the batch when it is full (i.e. BATCH_SIZE) */
            insert_batch(batch_state, cr->label_name, cr->graph_oid);
            batch_state->num_tuples = 0;
        }
    }

    for (i = 0; i < n_fields; ++i)
    {
        free(cr->fields[i]);
    }

    if (cr->error)
    {
        ereport(NOTICE,(errmsg("THere is some error")));
    }

    cr->cur_field = 0;
    cr->curr_row_length = 0;
    cr->row += 1;
}