in src/backend/utils/load/ag_load_edges.c [61:152]
void edge_row_cb(int delim __attribute__((unused)), void *data)
{
csv_edge_reader *cr = (csv_edge_reader*)data;
batch_insert_state *batch_state = cr->batch_state;
size_t i, n_fields;
int64 start_id_int;
graphid start_vertex_graph_id;
int start_vertex_type_id;
int64 end_id_int;
graphid end_vertex_graph_id;
int end_vertex_type_id;
graphid edge_id;
int64 entry_id;
TupleTableSlot *slot;
n_fields = cr->cur_field;
if (cr->row == 0)
{
cr->header_num = cr->cur_field;
cr->header_row_length = cr->curr_row_length;
cr->header_len = (size_t* )malloc(sizeof(size_t *) * cr->cur_field);
cr->header = malloc((sizeof (char*) * cr->cur_field));
for (i = 0; i<cr->cur_field; i++)
{
cr->header_len[i] = cr->fields_len[i];
cr->header[i] = strndup(cr->fields[i], cr->header_len[i]);
}
}
else
{
entry_id = nextval_internal(cr->label_seq_relid, true);
edge_id = make_graphid(cr->label_id, entry_id);
start_id_int = strtol(cr->fields[0], NULL, 10);
start_vertex_type_id = get_label_id(cr->fields[1], cr->graph_oid);
end_id_int = strtol(cr->fields[2], NULL, 10);
end_vertex_type_id = get_label_id(cr->fields[3], cr->graph_oid);
start_vertex_graph_id = make_graphid(start_vertex_type_id, start_id_int);
end_vertex_graph_id = make_graphid(end_vertex_type_id, end_id_int);
/* Get the appropriate slot from the batch state */
slot = batch_state->slots[batch_state->num_tuples];
/* Clear the slots contents */
ExecClearTuple(slot);
/* Fill the values in the slot */
slot->tts_values[0] = GRAPHID_GET_DATUM(edge_id);
slot->tts_values[1] = GRAPHID_GET_DATUM(start_vertex_graph_id);
slot->tts_values[2] = GRAPHID_GET_DATUM(end_vertex_graph_id);
slot->tts_values[3] = AGTYPE_P_GET_DATUM(
create_agtype_from_list_i(
cr->header, cr->fields,
n_fields, 4, cr->load_as_agtype));
slot->tts_isnull[0] = false;
slot->tts_isnull[1] = false;
slot->tts_isnull[2] = false;
slot->tts_isnull[3] = false;
/* Make the slot as containing virtual tuple */
ExecStoreVirtualTuple(slot);
batch_state->num_tuples++;
if (batch_state->num_tuples >= batch_state->max_tuples)
{
/* Insert the batch when it is full (i.e. BATCH_SIZE) */
insert_batch(batch_state, cr->label_name, cr->graph_oid);
batch_state->num_tuples = 0;
}
}
for (i = 0; i < n_fields; ++i)
{
free(cr->fields[i]);
}
if (cr->error)
{
ereport(NOTICE,(errmsg("THere is some error")));
}
cr->cur_field = 0;
cr->curr_row_length = 0;
cr->row += 1;
}