int create_labels_from_csv_file()

in src/backend/utils/load/ag_load_labels.c [189:291]


int create_labels_from_csv_file(char *file_path,
                                char *graph_name,
                                Oid graph_oid,
                                char *label_name,
                                int label_id,
                                bool id_field_exists,
                                bool load_as_agtype)
{

    FILE *fp;
    struct csv_parser p;
    char buf[1024];
    size_t bytes_read;
    unsigned char options = 0;
    csv_vertex_reader cr;
    char *label_seq_name;
    Oid temp_table_relid;

    if (csv_init(&p, options) != 0)
    {
        ereport(ERROR,
                (errmsg("Failed to initialize csv parser\n")));
    }

    temp_table_relid = RelnameGetRelid(GET_TEMP_VERTEX_ID_TABLE(graph_name));
    if (!OidIsValid(temp_table_relid))
    {
        setup_temp_table_for_vertex_ids(graph_name);
        temp_table_relid = RelnameGetRelid(GET_TEMP_VERTEX_ID_TABLE(graph_name));
    }

    csv_set_space_func(&p, is_space);
    csv_set_term_func(&p, is_term);

    fp = fopen(file_path, "rb");
    if (!fp)
    {
        ereport(ERROR,
                (errmsg("Failed to open %s\n", file_path)));
    }

    label_seq_name = get_label_seq_relation_name(label_name);

    memset((void*)&cr, 0, sizeof(csv_vertex_reader));

    cr.alloc = 2048;
    cr.fields = malloc(sizeof(char *) * cr.alloc);
    cr.fields_len = malloc(sizeof(size_t *) * cr.alloc);
    cr.header_row_length = 0;
    cr.curr_row_length = 0;
    cr.graph_name = graph_name;
    cr.graph_oid = graph_oid;
    cr.label_name = label_name;
    cr.label_id = label_id;
    cr.id_field_exists = id_field_exists;
    cr.label_seq_relid = get_relname_relid(label_seq_name, graph_oid);
    cr.load_as_agtype = load_as_agtype;
    cr.temp_table_relid = temp_table_relid;
    
    if (cr.id_field_exists)
    {
        /*
         * Set the curr_seq_num since we will need it to compare with
         * incoming entry_id.
         * 
         * We cant use currval because it will error out if nextval was
         * not called before in the session.
         */
        cr.curr_seq_num = nextval_internal(cr.label_seq_relid, true);
    }

    /* Initialize the batch insert state */
    init_vertex_batch_insert(&cr.batch_state, label_name, graph_oid,
                             cr.temp_table_relid);

    while ((bytes_read=fread(buf, 1, 1024, fp)) > 0)
    {
        if (csv_parse(&p, buf, bytes_read, vertex_field_cb,
                      vertex_row_cb, &cr) != bytes_read)
        {
            ereport(ERROR, (errmsg("Error while parsing file: %s\n",
                                   csv_strerror(csv_error(&p)))));
        }
    }

    csv_fini(&p, vertex_field_cb, vertex_row_cb, &cr);

    /* Finish any remaining batch inserts */
    finish_vertex_batch_insert(&cr.batch_state, label_name, graph_oid,
                               cr.temp_table_relid);

    if (ferror(fp))
    {
        ereport(ERROR, (errmsg("Error while reading file %s\n",
                               file_path)));
    }

    fclose(fp);

    free(cr.fields);
    csv_free(&p);
    return EXIT_SUCCESS;
}