in src/backend/utils/load/ag_load_labels.c [189:291]
int create_labels_from_csv_file(char *file_path,
char *graph_name,
Oid graph_oid,
char *label_name,
int label_id,
bool id_field_exists,
bool load_as_agtype)
{
FILE *fp;
struct csv_parser p;
char buf[1024];
size_t bytes_read;
unsigned char options = 0;
csv_vertex_reader cr;
char *label_seq_name;
Oid temp_table_relid;
if (csv_init(&p, options) != 0)
{
ereport(ERROR,
(errmsg("Failed to initialize csv parser\n")));
}
temp_table_relid = RelnameGetRelid(GET_TEMP_VERTEX_ID_TABLE(graph_name));
if (!OidIsValid(temp_table_relid))
{
setup_temp_table_for_vertex_ids(graph_name);
temp_table_relid = RelnameGetRelid(GET_TEMP_VERTEX_ID_TABLE(graph_name));
}
csv_set_space_func(&p, is_space);
csv_set_term_func(&p, is_term);
fp = fopen(file_path, "rb");
if (!fp)
{
ereport(ERROR,
(errmsg("Failed to open %s\n", file_path)));
}
label_seq_name = get_label_seq_relation_name(label_name);
memset((void*)&cr, 0, sizeof(csv_vertex_reader));
cr.alloc = 2048;
cr.fields = malloc(sizeof(char *) * cr.alloc);
cr.fields_len = malloc(sizeof(size_t *) * cr.alloc);
cr.header_row_length = 0;
cr.curr_row_length = 0;
cr.graph_name = graph_name;
cr.graph_oid = graph_oid;
cr.label_name = label_name;
cr.label_id = label_id;
cr.id_field_exists = id_field_exists;
cr.label_seq_relid = get_relname_relid(label_seq_name, graph_oid);
cr.load_as_agtype = load_as_agtype;
cr.temp_table_relid = temp_table_relid;
if (cr.id_field_exists)
{
/*
* Set the curr_seq_num since we will need it to compare with
* incoming entry_id.
*
* We cant use currval because it will error out if nextval was
* not called before in the session.
*/
cr.curr_seq_num = nextval_internal(cr.label_seq_relid, true);
}
/* Initialize the batch insert state */
init_vertex_batch_insert(&cr.batch_state, label_name, graph_oid,
cr.temp_table_relid);
while ((bytes_read=fread(buf, 1, 1024, fp)) > 0)
{
if (csv_parse(&p, buf, bytes_read, vertex_field_cb,
vertex_row_cb, &cr) != bytes_read)
{
ereport(ERROR, (errmsg("Error while parsing file: %s\n",
csv_strerror(csv_error(&p)))));
}
}
csv_fini(&p, vertex_field_cb, vertex_row_cb, &cr);
/* Finish any remaining batch inserts */
finish_vertex_batch_insert(&cr.batch_state, label_name, graph_oid,
cr.temp_table_relid);
if (ferror(fp))
{
ereport(ERROR, (errmsg("Error while reading file %s\n",
file_path)));
}
fclose(fp);
free(cr.fields);
csv_free(&p);
return EXIT_SUCCESS;
}