in src/backend/utils/load/ag_load_labels.c [70:163]
void vertex_row_cb(int delim __attribute__((unused)), void *data)
{
csv_vertex_reader *cr = (csv_vertex_reader*)data;
batch_insert_state *batch_state = cr->batch_state;
size_t i, n_fields;
graphid vertex_id;
int64 entry_id;
TupleTableSlot *slot;
TupleTableSlot *temp_id_slot;
n_fields = cr->cur_field;
if (cr->row == 0)
{
cr->header_num = cr->cur_field;
cr->header_row_length = cr->curr_row_length;
cr->header_len = (size_t* )malloc(sizeof(size_t *) * cr->cur_field);
cr->header = malloc((sizeof (char*) * cr->cur_field));
for (i = 0; i<cr->cur_field; i++)
{
cr->header_len[i] = cr->fields_len[i];
cr->header[i] = strndup(cr->fields[i], cr->header_len[i]);
}
}
else
{
if (cr->id_field_exists)
{
entry_id = strtol(cr->fields[0], NULL, 10);
if (entry_id > cr->curr_seq_num)
{
DirectFunctionCall2(setval_oid,
ObjectIdGetDatum(cr->label_seq_relid),
Int64GetDatum(entry_id));
cr->curr_seq_num = entry_id;
}
}
else
{
entry_id = nextval_internal(cr->label_seq_relid, true);
}
vertex_id = make_graphid(cr->label_id, entry_id);
/* Get the appropriate slot from the batch state */
slot = batch_state->slots[batch_state->num_tuples];
temp_id_slot = batch_state->temp_id_slots[batch_state->num_tuples];
/* Clear the slots contents */
ExecClearTuple(slot);
ExecClearTuple(temp_id_slot);
/* Fill the values in the slot */
slot->tts_values[0] = GRAPHID_GET_DATUM(vertex_id);
slot->tts_values[1] = AGTYPE_P_GET_DATUM(
create_agtype_from_list(cr->header, cr->fields,
n_fields, entry_id,
cr->load_as_agtype));
slot->tts_isnull[0] = false;
slot->tts_isnull[1] = false;
temp_id_slot->tts_values[0] = GRAPHID_GET_DATUM(vertex_id);
temp_id_slot->tts_isnull[0] = false;
/* Make the slot as containing virtual tuple */
ExecStoreVirtualTuple(slot);
ExecStoreVirtualTuple(temp_id_slot);
batch_state->num_tuples++;
if (batch_state->num_tuples >= batch_state->max_tuples)
{
/* Insert the batch when it is full (i.e. BATCH_SIZE) */
insert_vertex_batch(batch_state, cr->label_name, cr->graph_oid,
cr->temp_table_relid);
batch_state->num_tuples = 0;
}
}
for (i = 0; i < n_fields; ++i)
{
free(cr->fields[i]);
}
if (cr->error)
{
ereport(NOTICE,(errmsg("THere is some error")));
}
cr->cur_field = 0;
cr->curr_row_length = 0;
cr->row += 1;
}