in sources/cg_schema.c [1310:1775]
cql_noexport void cg_schema_upgrade_main(ast_node *head) {
Contract(options.file_names_count == 1);
cql_exit_on_semantic_errors(head);
exit_on_no_global_proc();
schema_annotation* notes;
size_t schema_items_count;
recreate_annotation* recreates;
size_t recreate_items_count;
int32_t max_schema_version;
llint_t schema_crc = cg_schema_compute_crc(
¬es,
&schema_items_count,
&recreates,
&recreate_items_count,
&max_schema_version);
bytebuf version_bits;
bytebuf_open(&version_bits);
CHARBUF_OPEN(preamble);
CHARBUF_OPEN(main);
CHARBUF_OPEN(decls);
CHARBUF_OPEN(pending);
CHARBUF_OPEN(upgrade);
CHARBUF_OPEN(baseline);
CHARBUF_OPEN(drops);
bprintf(&decls, "%s", rt->source_prefix);
bprintf(&decls, "-- no columns will be considered hidden in this script\n");
bprintf(&decls, "-- DDL in procs will not count as declarations\n");
bprintf(&decls, "@SCHEMA_UPGRADE_SCRIPT;\n\n");
bprintf(&decls, "-- schema crc %lld\n\n", schema_crc);
cg_schema_emit_facet_functions(&decls);
cg_schema_emit_sqlite_master(&decls);
bprintf(&decls, "-- declare full schema of tables and views to be upgraded and their dependencies -- \n");
cg_generate_schema_by_mode(&decls, SCHEMA_TO_DECLARE);
cg_schema_helpers(&decls);
bprintf(&decls, "-- declared upgrade procedures if any\n");
cg_schema_emit_baseline_tables_proc(&preamble, &baseline);
int32_t view_creates = 0, view_drops = 0;
cg_schema_manage_views(&preamble, &view_drops, &view_creates);
int32_t index_creates = 0, index_drops = 0;
cg_schema_manage_indices(&preamble, &index_drops, &index_creates);
int32_t trigger_creates = 0, trigger_drops = 0;
cg_schema_manage_triggers(&preamble, &trigger_drops, &trigger_creates);
if (recreate_items_count) {
cg_schema_manage_recreate_tables(&preamble, &decls, recreates, recreate_items_count);
}
bool_t has_temp_schema = cg_schema_emit_temp_schema_proc(&preamble);
bool_t one_time_drop_needed = false;
// code to read the facets into the hash table
bprintf(&preamble, "@attribute(cql:private)\n");
bprintf(&preamble, "CREATE PROCEDURE %s_setup_facets()\n", global_proc_name);
bprintf(&preamble, "BEGIN\n");
bprintf(&preamble, " BEGIN TRY\n");
bprintf(&preamble, " SET %s_facets := cql_facets_new();\n", global_proc_name);
bprintf(&preamble, " DECLARE C CURSOR FOR SELECT * from %s_cql_schema_facets;\n", global_proc_name);
bprintf(&preamble, " LOOP FETCH C\n");
bprintf(&preamble, " BEGIN\n");
bprintf(&preamble, " LET added := cql_facet_add(%s_facets, C.facet, C.version);\n", global_proc_name);
bprintf(&preamble, " END;\n");
bprintf(&preamble, " END TRY;\n");
bprintf(&preamble, " BEGIN CATCH\n");
bprintf(&preamble, " -- if table doesn't exist we just have empty facets, that's ok\n");
bprintf(&preamble, " END CATCH;\n");
bprintf(&preamble, "END;\n\n");
// the main upgrade worker
bprintf(&main, "\n@attribute(cql:private)\n");
bprintf(&main, "CREATE PROCEDURE %s_perform_upgrade_steps()\n", global_proc_name);
bprintf(&main, "BEGIN\n");
bprintf(&main, " DECLARE schema_version LONG INTEGER NOT NULL;\n");
if (view_drops) {
bprintf(&main, " -- dropping all views --\n");
bprintf(&main, " CALL %s_cql_drop_all_views();\n\n", global_proc_name);
}
if (index_drops) {
bprintf(&main, " -- dropping condemned or changing indices --\n");
bprintf(&main, " CALL %s_cql_drop_all_indices();\n\n", global_proc_name);
}
if (trigger_drops) {
bprintf(&main, " -- dropping condemned or changing triggers --\n");
bprintf(&main, " CALL %s_cql_drop_all_triggers();\n\n", global_proc_name);
}
if (baseline.used > 1 && options.min_schema_version == 0) {
llint_t baseline_crc = (llint_t)crc_charbuf(&baseline);
bprintf(&main, " ---- install baseline schema if needed ----\n\n");
bprintf(&main, " CALL %s_cql_get_version_crc(0, schema_version);\n", global_proc_name);
bprintf(&main, " IF schema_version != %lld THEN\n", baseline_crc);
bprintf(&main, " CALL %s_cql_install_baseline_schema();\n", global_proc_name);
bprintf(&main, " CALL %s_cql_set_version_crc(0, %lld);\n", global_proc_name, baseline_crc);
bprintf(&main, " END IF;\n\n");
}
uint32_t prev_version = 0;
for (int32_t i = 0; i < schema_items_count; i++) {
schema_annotation *note = ¬es[i];
ast_node *version_annotation = note->annotation_ast;
uint32_t type = note->annotation_type;
Contract(type >= SCHEMA_ANNOTATION_FIRST && type <= SCHEMA_ANNOTATION_LAST);
Contract(is_ast_version_annotation(version_annotation));
EXTRACT_OPTION(vers, version_annotation->left);
Invariant(note->version == vers);
Invariant(vers > 0); // already verified to be positive
if (vers < options.min_schema_version) {
continue;
}
if (prev_version != vers) {
cg_schema_end_version(&main, &upgrade, &pending, prev_version, &version_bits);
prev_version = (uint32_t)vers;
}
CSTR target_name = note->target_name;
Invariant(type >= SCHEMA_ANNOTATION_FIRST && type <= SCHEMA_ANNOTATION_LAST);
if (!include_from_region(note->target_ast->sem->region, SCHEMA_TO_UPGRADE)) {
continue;
}
// no schema maintenance for blob storage tables, they aren't physical tables
if (is_ast_create_table_stmt(note->target_ast) && is_table_blob_storage(note->target_ast)) {
continue;
}
bool_t subscription_management = false;
switch (type) {
case SCHEMA_ANNOTATION_CREATE_COLUMN: {
if (note->target_ast->sem->sem_type & SCHEMA_FLAG_UNSUB) {
// do not emit the alter table add column if we are
// currently unsubscribed, not that resub happens AFTER
// CREATE COLUMN and UNSUB happens before, this is important!
continue;
}
ast_node *def = note->column_ast;
Contract(is_ast_col_def(def));
EXTRACT_NOTNULL(col_def_type_attrs, def->left);
EXTRACT_NOTNULL(col_def_name_type, col_def_type_attrs->left);
EXTRACT_STRING(col_name, col_def_name_type->left);
CSTR col_type = coretype_string(def->sem->sem_type);
gen_sql_callbacks callbacks;
init_gen_sql_callbacks(&callbacks);
callbacks.mode = gen_mode_no_annotations;
CHARBUF_OPEN(sql_out);
gen_set_output_buffer(&sql_out);
// no-op callbacks still suppress @create/@delete which is not legal in alter table
gen_col_def_with_callbacks(def, &callbacks);
bprintf(&upgrade, " -- altering table %s to add column %s %s;\n\n",
target_name,
col_name,
col_type);
bprintf(&upgrade, " IF NOT %s_check_column_exists('%s', '*[( ]%s %s*') THEN \n",
global_proc_name,
target_name,
col_name,
col_type);
bprintf(&upgrade, " ALTER TABLE %s ADD COLUMN %s;\n",
target_name,
sql_out.ptr);
bprintf(&upgrade, " END IF;\n\n");
CHARBUF_CLOSE(sql_out);
break;
}
case SCHEMA_ANNOTATION_DELETE_COLUMN: {
ast_node *def = note->column_ast;
Contract(is_ast_col_def(def));
EXTRACT_NOTNULL(col_def_type_attrs, def->left);
EXTRACT_NOTNULL(col_def_name_type, col_def_type_attrs->left);
EXTRACT_STRING(col_name, col_def_name_type->left);
bprintf(&upgrade, " -- logical delete of column %s from %s; -- no ddl\n\n", col_name, target_name);
break;
}
case SCHEMA_ANNOTATION_CREATE_TABLE: {
// check for one time drop
EXTRACT_ANY(dot, version_annotation->right);
if (dot && is_ast_dot(dot)) {
EXTRACT_STRING(lhs, dot->left);
EXTRACT_STRING(rhs, dot->right);
if (!Strcasecmp(lhs, "cql") && !Strcasecmp(rhs, "from_recreate")) {
bprintf(&upgrade, " -- one time drop %s\n\n", target_name);
bprintf(&upgrade, " CALL %s_cql_one_time_drop('%s', %d);\n\n", global_proc_name, target_name, vers);
one_time_drop_needed = true;
}
}
bprintf(&upgrade, " -- creating table %s\n\n", target_name);
gen_sql_callbacks callbacks;
init_gen_sql_callbacks(&callbacks);
callbacks.col_def_callback = cg_suppress_new_col_def;
callbacks.if_not_exists_callback = cg_schema_force_if_not_exists;
callbacks.mode = gen_mode_no_annotations;
CHARBUF_OPEN(sql_out);
gen_set_output_buffer(&sql_out);
gen_statement_with_callbacks(note->target_ast, &callbacks); // only the original columns
bindent(&upgrade, &sql_out, 6);
bprintf(&upgrade, ";\n\n");
CHARBUF_CLOSE(sql_out);
break;
}
case SCHEMA_ANNOTATION_DELETE_TABLE:
bprintf(&drops, " DROP TABLE IF EXISTS %s;\n", target_name);
break;
// Note: @create is invalid for INDEX/VIEW/TRIGGER so there can be no such annotation
case SCHEMA_ANNOTATION_DELETE_INDEX:
case SCHEMA_ANNOTATION_DELETE_VIEW:
case SCHEMA_ANNOTATION_DELETE_TRIGGER:
// no annotation based actions other than migration proc (handled below
Contract(version_annotation->right);
bprintf(&upgrade, " -- delete migration proc for %s will run\n\n", target_name);
break;
case SCHEMA_ANNOTATION_UNSUB:
// @recreate tables do not need unsub, they will just delete like they usually do
// annotation not generated for such cases as it would be a no-op anyway
Invariant(!note->target_ast->sem->recreate);
// unsub demands a drop
bprintf(&upgrade, " -- unsubscription of %s\n\n", target_name);
bprintf(&upgrade, " DROP TABLE IF EXISTS %s;\n\n", target_name);
// current status: unsubcribed
note->target_ast->sem->sem_type |= SCHEMA_FLAG_UNSUB;
subscription_management = true;
break;
case SCHEMA_ANNOTATION_RESUB:
// @recreate tables do not need unsub, they will just delete like they usually do
// annotation not generated for such cases as it would be a no-op anyway
Invariant(!note->target_ast->sem->recreate);
// emit a create if not exists at this version
// note that we do not (!) emit a drop here because it's possible that
// something else will be later added to this schema rev causing it
// to re-run and we want it to be idempotent
bprintf(&upgrade, " -- resubscribe to %s\n\n", target_name);
gen_sql_callbacks callbacks;
init_gen_sql_callbacks(&callbacks);
callbacks.col_def_callback = cg_suppress_col_def_by_version;
callbacks.col_def_context = ¬e->version;
callbacks.if_not_exists_callback = cg_schema_force_if_not_exists;
callbacks.mode = gen_mode_sql;
callbacks.long_to_int_conv = true;
CHARBUF_OPEN(sql_out);
gen_set_output_buffer(&sql_out);
// only the columns as of the current version
gen_statement_with_callbacks(note->target_ast, &callbacks);
bindent(&upgrade, &sql_out, 6);
bprintf(&upgrade, ";\n\n");
CHARBUF_CLOSE(sql_out);
// current status: subcribed
note->target_ast->sem->sem_type &= sem_not(SCHEMA_FLAG_UNSUB);
subscription_management = true;
break;
case SCHEMA_ANNOTATION_AD_HOC:
// no annotation based actions other than migration proc (handled below)
Contract(version_annotation->right);
bprintf(&upgrade, " -- ad hoc migration proc %s will run\n\n", target_name);
break;
}
// handle any migration proc for any annotation
if (!subscription_management && version_annotation->right) {
// call any non-builtin migrations the generic way, builtins get whatever special handling they need
if (!is_ast_dot(version_annotation->right)) {
EXTRACT_STRING(proc, version_annotation->right);
bprintf(&pending, " IF cql_facet_find(%s_facets, '%s') = -1 THEN\n", global_proc_name, proc);
bprintf(&pending, " CALL %s();\n", proc);
bprintf(&pending, " CALL %s_cql_set_facet_version('%s', %d);\n", global_proc_name, proc, vers);
bprintf(&pending, " END IF;\n");
bprintf(&decls, "DECLARE PROC %s() USING TRANSACTION;\n", proc);
}
}
}
cg_schema_end_version(&main, &upgrade, &pending, prev_version, &version_bits);
if (drops.used > 1) {
bprintf(&main, " CALL %s_cql_drop_tables();\n", global_proc_name);
bprintf(&preamble, "@attribute(cql:private)\n");
bprintf(&preamble, "CREATE PROC %s_cql_drop_tables()\n", global_proc_name);
bprintf(&preamble, "BEGIN\n");
bprintf(&preamble, "%s", drops.ptr);
bprintf(&preamble, "END;\n");
}
if (recreate_items_count) {
bprintf(&main, " CALL %s_cql_recreate_tables();\n", global_proc_name);
}
if (view_creates) {
bprintf(&main, " CALL %s_cql_create_all_views();\n", global_proc_name);
}
if (index_creates) {
bprintf(&main, " CALL %s_cql_create_all_indices();\n", global_proc_name);
}
if (trigger_creates) {
bprintf(&main, " CALL %s_cql_create_all_triggers();\n", global_proc_name);
}
CHARBUF_OPEN(missing_versions);
for (uint32_t v = 1; v <= prev_version; v++) {
uint32_t byteIndex = v / 8;
uint32_t bitMask = 1 << (v % 8);
if (version_bits.used > byteIndex && ((uint32_t)(version_bits.ptr[byteIndex]) & bitMask)) {
continue;
}
if (missing_versions.used > 1) {
bprintf(&missing_versions, ",");
}
bprintf(&missing_versions, "(%u)", v);
}
if (missing_versions.used > 1) {
bprintf(&main, " CALL %s_cleanup_unused_versions();\n", global_proc_name);
bprintf(&preamble, "\n@attribute(cql:private)\n");
bprintf(&preamble, "CREATE PROC %s_cleanup_unused_versions()\n", global_proc_name);
bprintf(&preamble, "BEGIN\n");
bprintf(&preamble, " WITH\n");
bprintf(&preamble, " V(v) AS (VALUES %s),\n", missing_versions.ptr);
bprintf(&preamble, " F(f) AS (SELECT 'cql_schema_v'||v from V)\n");
bprintf(&preamble, " DELETE FROM %s_cql_schema_facets WHERE facet IN (SELECT f from F);\n", global_proc_name);
bprintf(&preamble, "END;\n");
}
CHARBUF_CLOSE(missing_versions);
bprintf(&main, " CALL %s_cql_set_facet_version('cql_schema_version', %d);\n", global_proc_name, prev_version);
bprintf(&main, " CALL %s_cql_set_facet_version('cql_schema_crc', %lld);\n", global_proc_name, schema_crc);
bprintf(&main, "END;\n\n");
bprintf(&main, "CREATE PROCEDURE %s_get_current_and_proposed_versions(\n", global_proc_name);
bprintf(&main, " out current long not null,\n");
bprintf(&main, " out proposed long not null\n");
bprintf(&main, " )\n");
bprintf(&main, "BEGIN\n");
bprintf(&main, " SET current := %s_cql_get_facet_version('cql_schema_version');\n", global_proc_name);
bprintf(&main, " SET proposed := %d;\n", max_schema_version);
bprintf(&main, "END;\n");
bprintf(&main, "@attribute(cql:private)\n");
bprintf(&main, "CREATE PROCEDURE %s_perform_needed_upgrades()\n", global_proc_name);
bprintf(&main, "BEGIN\n");
bprintf(&main, " -- check for downgrade --\n");
bprintf(&main, " IF cql_facet_find(%s_facets, 'cql_schema_version') > %d THEN\n", global_proc_name, max_schema_version);
bprintf(&main, " SELECT 'downgrade detected' facet;\n");
bprintf(&main, " ELSE\n");
bprintf(&main, " -- save the current facets so we can diff them later --\n");
bprintf(&main, " CALL %s_save_cql_schema_facets();\n", global_proc_name);
bprintf(&main, " CALL %s_perform_upgrade_steps();\n\n", global_proc_name);
bprintf(&main, " -- finally produce the list of differences\n");
bprintf(&main, " SELECT T1.facet FROM\n");
bprintf(&main, " %s_cql_schema_facets T1\n", global_proc_name);
bprintf(&main, " LEFT OUTER JOIN %s_cql_schema_facets_saved T2\n", global_proc_name);
bprintf(&main, " ON T1.facet = T2.facet\n", global_proc_name);
bprintf(&main, " WHERE T1.version is not T2.version;\n");
bprintf(&main, " END IF;\n");
bprintf(&main, "END;\n\n");
bprintf(&main, "CREATE PROCEDURE %s()\n", global_proc_name);
bprintf(&main, "BEGIN\n");
bprintf(&main, " DECLARE schema_crc LONG INTEGER NOT NULL;\n");
bprintf(&main, "\n");
bprintf(&main, " -- create schema facets information table --\n");
bprintf(&main, " CALL %s_create_cql_schema_facets_if_needed();\n\n", global_proc_name);
bprintf(&main, " -- fetch the last known schema crc, if it's different do the upgrade --\n");
bprintf(&main, " CALL %s_cql_get_facet_version('cql_schema_crc', schema_crc);\n\n", global_proc_name);
bprintf(&main, " IF schema_crc <> %lld THEN\n", (llint_t)schema_crc);
bprintf(&main, " BEGIN TRY\n");
bprintf(&main, " CALL %s_setup_facets();\n", global_proc_name);
bprintf(&main, " CALL %s_perform_needed_upgrades();\n", global_proc_name);
bprintf(&main, " END TRY;\n");
bprintf(&main, " BEGIN CATCH\n");
bprintf(&main, " CALL cql_facets_delete(%s_facets);\n", global_proc_name);
bprintf(&main, " SET %s_facets := 0;\n", global_proc_name);
bprintf(&main, " THROW;\n");
bprintf(&main, " END CATCH;\n");
bprintf(&main, " CALL cql_facets_delete(%s_facets);\n", global_proc_name);
bprintf(&main, " SET %s_facets := 0;\n", global_proc_name);
bprintf(&main, " ELSE\n");
bprintf(&main, " -- some canonical result for no differences --\n");
bprintf(&main, " SELECT 'no differences' facet;\n");
bprintf(&main, " END IF;\n");
if (has_temp_schema) {
bprintf(&main, " ---- install temp schema after upgrade is complete ----\n");
bprintf(&main, " CALL %s_cql_install_temp_schema();\n\n", global_proc_name);
}
bprintf(&main, "END;\n\n");
if (one_time_drop_needed) {
cg_schema_emit_one_time_drop(&decls);
}
CHARBUF_OPEN(output_file);
bprintf(&output_file, "%s\n", decls.ptr);
bprintf(&output_file, "%s", preamble.ptr);
bprintf(&output_file, "%s", main.ptr);
cql_write_file(options.file_names[0], output_file.ptr);
CHARBUF_CLOSE(output_file);
CHARBUF_CLOSE(drops);
CHARBUF_CLOSE(baseline);
CHARBUF_CLOSE(upgrade);
CHARBUF_CLOSE(pending);
CHARBUF_CLOSE(decls);
CHARBUF_CLOSE(main);
CHARBUF_CLOSE(preamble);
bytebuf_close(&version_bits);
}