in source/hack_parallel/hack_parallel/heap/hh_shared.c [2340:2458]
static size_t hh_update_dep_table_helper(
sqlite3* const db_out,
const char* const build_info) {
struct timeval start_t = {0};
gettimeofday(&start_t, NULL);
// Create header for verification
write_sqlite_header(db_out, build_info);
// Hand-off the data to the OS for writing and continue,
// don't wait for it to complete
assert_sql(
sqlite3_exec(db_out, "PRAGMA synchronous = OFF", NULL, 0, NULL),
SQLITE_OK);
// Store the rollback journal in memory
assert_sql(
sqlite3_exec(db_out, "PRAGMA journal_mode = MEMORY", NULL, 0, NULL),
SQLITE_OK);
// Use one transaction for all the insertions
assert_sql(
sqlite3_exec(db_out, "BEGIN TRANSACTION", NULL, 0, NULL), SQLITE_OK);
// Create entries on the table
size_t slot = 0;
size_t count = 0;
size_t prev_count = 0;
uint32_t* values = NULL;
size_t iter = 0;
sqlite3_stmt* insert_stmt = NULL;
sqlite3_stmt* select_dep_stmt = NULL;
const char* sql =
"INSERT OR REPLACE INTO DEPTABLE (KEY_VERTEX, VALUE_VERTEX) VALUES (?,?)";
assert_sql(
sqlite3_prepare_v2(db_out, sql, -1, &insert_stmt, NULL), SQLITE_OK);
size_t existing_rows_lookup_duration = 0L;
size_t existing_rows_updated_count = 0;
size_t edges_added = 0;
size_t new_rows_count = 0;
for (slot = 0; slot < dep_size; ++slot) {
count = deptbl_entry_count_for_slot(slot);
if (count == 0) {
continue;
}
deptbl_entry_t slotval = deptbl[slot];
query_result_t existing = get_dep_sqlite_blob_with_duration(
db_out,
slotval.s.key.num,
&select_dep_stmt,
&existing_rows_lookup_duration);
// Make sure we don't have malformed output
assert(existing.size % sizeof(uint32_t) == 0);
size_t existing_count = existing.size / sizeof(uint32_t);
if (count + existing_count > prev_count) {
// No need to allocate new space if can just re use the old one
values = realloc(values, (count + existing_count) * sizeof(uint32_t));
prev_count = (count + existing_count);
}
assert(values != NULL);
iter = 0;
if (slotval.raw != 0 && slotval.s.key.tag == TAG_KEY) {
// This is the head of a linked list aka KEY VERTEX
assert_sql(
sqlite3_bind_int(insert_stmt, 1, slotval.s.key.num), SQLITE_OK);
// Then combine each value to VALUE VERTEX
while (slotval.s.next.tag == TAG_NEXT) {
assert(slotval.s.next.num < dep_size);
slotval = deptbl[slotval.s.next.num];
values[iter] = slotval.s.key.num;
iter++;
}
// The final "next" in the list is always a value, not a next pointer.
values[iter] = slotval.s.next.num;
iter++;
if (existing_count > 0) {
assert(existing.blob != NULL);
memcpy(
&(values[iter]),
existing.blob,
existing_count * (sizeof(uint32_t)));
iter += existing_count;
existing_rows_updated_count += 1;
} else {
new_rows_count += 1;
}
assert_sql(
sqlite3_bind_blob(
insert_stmt,
2,
values,
iter * sizeof(uint32_t),
SQLITE_TRANSIENT),
SQLITE_OK);
assert_sql(sqlite3_step(insert_stmt), SQLITE_DONE);
assert_sql(sqlite3_clear_bindings(insert_stmt), SQLITE_OK);
assert_sql(sqlite3_reset(insert_stmt), SQLITE_OK);
}
edges_added += iter - existing_count;
}
if (values != NULL) {
free(values);
}
assert_sql(sqlite3_finalize(insert_stmt), SQLITE_OK);
assert_sql(sqlite3_exec(db_out, "END TRANSACTION", NULL, 0, NULL), SQLITE_OK);
start_t = log_duration("Finished SQL Transaction", start_t);
fprintf(
stderr,
"Lookup of existing rows took %lu us\n",
existing_rows_lookup_duration);
fprintf(stderr, "Wrote %lu new rows\n", new_rows_count);
fprintf(stderr, "Updated %lu existing rows\n", existing_rows_updated_count);
destroy_prepared_stmt(&select_dep_stmt);
assert_sql(sqlite3_close(db_out), SQLITE_OK);
log_duration("Finished closing SQL connection", start_t);
return edges_added;
}