cql_noexport void cg_schema_upgrade_main()

in sources/cg_schema.c [1310:1775]


cql_noexport void cg_schema_upgrade_main(ast_node *head) {
  Contract(options.file_names_count == 1);

  cql_exit_on_semantic_errors(head);
  exit_on_no_global_proc();

  schema_annotation* notes;
  size_t schema_items_count;
  recreate_annotation* recreates;
  size_t recreate_items_count;
  int32_t max_schema_version;
  llint_t schema_crc = cg_schema_compute_crc(
     &notes,
     &schema_items_count,
     &recreates,
     &recreate_items_count,
     &max_schema_version);

  bytebuf version_bits;
  bytebuf_open(&version_bits);

  CHARBUF_OPEN(preamble);
  CHARBUF_OPEN(main);
  CHARBUF_OPEN(decls);
  CHARBUF_OPEN(pending);
  CHARBUF_OPEN(upgrade);
  CHARBUF_OPEN(baseline);
  CHARBUF_OPEN(drops);

  bprintf(&decls, "%s", rt->source_prefix);
  bprintf(&decls, "-- no columns will be considered hidden in this script\n");
  bprintf(&decls, "-- DDL in procs will not count as declarations\n");
  bprintf(&decls, "@SCHEMA_UPGRADE_SCRIPT;\n\n");
  bprintf(&decls, "-- schema crc %lld\n\n", schema_crc);

  cg_schema_emit_facet_functions(&decls);
  cg_schema_emit_sqlite_master(&decls);
  bprintf(&decls, "-- declare full schema of tables and views to be upgraded and their dependencies -- \n");
  cg_generate_schema_by_mode(&decls, SCHEMA_TO_DECLARE);
  cg_schema_helpers(&decls);

  bprintf(&decls, "-- declared upgrade procedures if any\n");

  cg_schema_emit_baseline_tables_proc(&preamble, &baseline);

  int32_t view_creates = 0, view_drops = 0;
  cg_schema_manage_views(&preamble, &view_drops, &view_creates);

  int32_t index_creates = 0, index_drops = 0;
  cg_schema_manage_indices(&preamble, &index_drops, &index_creates);

  int32_t trigger_creates = 0, trigger_drops = 0;
  cg_schema_manage_triggers(&preamble, &trigger_drops, &trigger_creates);

  if (recreate_items_count) {
    cg_schema_manage_recreate_tables(&preamble, &decls, recreates, recreate_items_count);
  }

  bool_t has_temp_schema = cg_schema_emit_temp_schema_proc(&preamble);
  bool_t one_time_drop_needed = false;

  // code to read the facets into the hash table

  bprintf(&preamble, "@attribute(cql:private)\n");
  bprintf(&preamble, "CREATE PROCEDURE %s_setup_facets()\n", global_proc_name);
  bprintf(&preamble, "BEGIN\n");
  bprintf(&preamble, "  BEGIN TRY\n");
  bprintf(&preamble, "    SET %s_facets := cql_facets_new();\n", global_proc_name);
  bprintf(&preamble, "    DECLARE C CURSOR FOR SELECT * from %s_cql_schema_facets;\n", global_proc_name);
  bprintf(&preamble, "    LOOP FETCH C\n");
  bprintf(&preamble, "    BEGIN\n");
  bprintf(&preamble, "      LET added := cql_facet_add(%s_facets, C.facet, C.version);\n", global_proc_name);
  bprintf(&preamble, "    END;\n");
  bprintf(&preamble, "  END TRY;\n");
  bprintf(&preamble, "  BEGIN CATCH\n");
  bprintf(&preamble, "   -- if table doesn't exist we just have empty facets, that's ok\n");
  bprintf(&preamble, "  END CATCH;\n");
  bprintf(&preamble, "END;\n\n");

  // the main upgrade worker

  bprintf(&main, "\n@attribute(cql:private)\n");
  bprintf(&main, "CREATE PROCEDURE %s_perform_upgrade_steps()\n", global_proc_name);
  bprintf(&main, "BEGIN\n");
  bprintf(&main, "  DECLARE schema_version LONG INTEGER NOT NULL;\n");

  if (view_drops) {
    bprintf(&main, "    -- dropping all views --\n");
    bprintf(&main, "    CALL %s_cql_drop_all_views();\n\n", global_proc_name);
  }

  if (index_drops) {
    bprintf(&main, "    -- dropping condemned or changing indices --\n");
    bprintf(&main, "    CALL %s_cql_drop_all_indices();\n\n", global_proc_name);
  }

  if (trigger_drops) {
    bprintf(&main, "    -- dropping condemned or changing triggers --\n");
    bprintf(&main, "    CALL %s_cql_drop_all_triggers();\n\n", global_proc_name);
  }

  if (baseline.used > 1 && options.min_schema_version == 0) {
    llint_t baseline_crc = (llint_t)crc_charbuf(&baseline);
    bprintf(&main, "    ---- install baseline schema if needed ----\n\n");
    bprintf(&main, "    CALL %s_cql_get_version_crc(0, schema_version);\n", global_proc_name);
    bprintf(&main, "    IF schema_version != %lld THEN\n", baseline_crc);
    bprintf(&main, "      CALL %s_cql_install_baseline_schema();\n", global_proc_name);
    bprintf(&main, "      CALL %s_cql_set_version_crc(0, %lld);\n", global_proc_name, baseline_crc);
    bprintf(&main, "    END IF;\n\n");
  }

  uint32_t prev_version = 0;

  for (int32_t i = 0; i < schema_items_count; i++) {
    schema_annotation *note = &notes[i];

    ast_node *version_annotation = note->annotation_ast;

    uint32_t type = note->annotation_type;
    Contract(type >= SCHEMA_ANNOTATION_FIRST && type <= SCHEMA_ANNOTATION_LAST);

    Contract(is_ast_version_annotation(version_annotation));
    EXTRACT_OPTION(vers, version_annotation->left);

    Invariant(note->version == vers);
    Invariant(vers > 0);  // already verified to be positive

    if (vers < options.min_schema_version) {
      continue;
    }

    if (prev_version != vers) {
      cg_schema_end_version(&main, &upgrade, &pending, prev_version, &version_bits);
      prev_version = (uint32_t)vers;
    }

    CSTR target_name = note->target_name;

    Invariant(type >= SCHEMA_ANNOTATION_FIRST && type <= SCHEMA_ANNOTATION_LAST);

    if (!include_from_region(note->target_ast->sem->region, SCHEMA_TO_UPGRADE)) {
      continue;
    }

    // no schema maintenance for blob storage tables, they aren't physical tables
    if (is_ast_create_table_stmt(note->target_ast) && is_table_blob_storage(note->target_ast)) {
      continue;
    }

    bool_t subscription_management = false;

    switch (type) {
      case SCHEMA_ANNOTATION_CREATE_COLUMN: {

        if (note->target_ast->sem->sem_type & SCHEMA_FLAG_UNSUB) {
          // do not emit the alter table add column if we are
          // currently unsubscribed, not that resub happens AFTER
          // CREATE COLUMN and UNSUB happens before, this is important!
          continue;
        }

        ast_node *def = note->column_ast;
        Contract(is_ast_col_def(def));
        EXTRACT_NOTNULL(col_def_type_attrs, def->left);
        EXTRACT_NOTNULL(col_def_name_type, col_def_type_attrs->left);
        EXTRACT_STRING(col_name, col_def_name_type->left);

        CSTR col_type = coretype_string(def->sem->sem_type);
        gen_sql_callbacks callbacks;
        init_gen_sql_callbacks(&callbacks);
        callbacks.mode = gen_mode_no_annotations;

        CHARBUF_OPEN(sql_out);
        gen_set_output_buffer(&sql_out);
        // no-op callbacks still suppress @create/@delete which is not legal in alter table
        gen_col_def_with_callbacks(def, &callbacks);

        bprintf(&upgrade, "      -- altering table %s to add column %s %s;\n\n",
          target_name,
          col_name,
          col_type);
        bprintf(&upgrade, "      IF NOT %s_check_column_exists('%s', '*[( ]%s %s*') THEN \n",
          global_proc_name,
          target_name,
          col_name,
          col_type);
        bprintf(&upgrade, "        ALTER TABLE %s ADD COLUMN %s;\n",
          target_name,
          sql_out.ptr);
        bprintf(&upgrade, "      END IF;\n\n");

        CHARBUF_CLOSE(sql_out);
        break;
      }

      case SCHEMA_ANNOTATION_DELETE_COLUMN: {
        ast_node *def = note->column_ast;
        Contract(is_ast_col_def(def));
        EXTRACT_NOTNULL(col_def_type_attrs, def->left);
        EXTRACT_NOTNULL(col_def_name_type, col_def_type_attrs->left);
        EXTRACT_STRING(col_name, col_def_name_type->left);

        bprintf(&upgrade, "      -- logical delete of column %s from %s; -- no ddl\n\n", col_name, target_name);
        break;
      }

      case SCHEMA_ANNOTATION_CREATE_TABLE: {
        // check for one time drop

        EXTRACT_ANY(dot, version_annotation->right);
        if (dot && is_ast_dot(dot)) {
          EXTRACT_STRING(lhs, dot->left);
          EXTRACT_STRING(rhs, dot->right);

          if (!Strcasecmp(lhs, "cql") && !Strcasecmp(rhs, "from_recreate")) {
            bprintf(&upgrade, "      -- one time drop %s\n\n", target_name);
            bprintf(&upgrade, "      CALL %s_cql_one_time_drop('%s', %d);\n\n", global_proc_name, target_name, vers);
            one_time_drop_needed = true;
          }
        }

        bprintf(&upgrade, "      -- creating table %s\n\n", target_name);

        gen_sql_callbacks callbacks;
        init_gen_sql_callbacks(&callbacks);
        callbacks.col_def_callback = cg_suppress_new_col_def;
        callbacks.if_not_exists_callback = cg_schema_force_if_not_exists;
        callbacks.mode = gen_mode_no_annotations;

        CHARBUF_OPEN(sql_out);
        gen_set_output_buffer(&sql_out);
        gen_statement_with_callbacks(note->target_ast, &callbacks);  // only the original columns

        bindent(&upgrade, &sql_out, 6);
        bprintf(&upgrade, ";\n\n");

        CHARBUF_CLOSE(sql_out);
        break;
      }

      case SCHEMA_ANNOTATION_DELETE_TABLE:
        bprintf(&drops, "  DROP TABLE IF EXISTS %s;\n", target_name);
        break;

      // Note: @create is invalid for INDEX/VIEW/TRIGGER so there can be no such annotation

      case SCHEMA_ANNOTATION_DELETE_INDEX:
      case SCHEMA_ANNOTATION_DELETE_VIEW:
      case SCHEMA_ANNOTATION_DELETE_TRIGGER:
        // no annotation based actions other than migration proc (handled below
        Contract(version_annotation->right);
        bprintf(&upgrade, "      -- delete migration proc for %s will run\n\n", target_name);
        break;

      case SCHEMA_ANNOTATION_UNSUB:
        // @recreate tables do not need unsub, they will just delete like they usually do
        // annotation not generated for such cases as it would be a no-op anyway
        Invariant(!note->target_ast->sem->recreate);

        // unsub demands a drop
        bprintf(&upgrade, "      -- unsubscription of %s\n\n", target_name);
        bprintf(&upgrade, "      DROP TABLE IF EXISTS %s;\n\n", target_name);

        // current status: unsubcribed
        note->target_ast->sem->sem_type |= SCHEMA_FLAG_UNSUB;
        subscription_management = true;
        break;

      case SCHEMA_ANNOTATION_RESUB:
        // @recreate tables do not need unsub, they will just delete like they usually do
        // annotation not generated for such cases as it would be a no-op anyway
        Invariant(!note->target_ast->sem->recreate);

        // emit a create if not exists at this version
        // note that we do not (!) emit a drop here because it's possible that
        // something else will be later added to this schema rev causing it
        // to re-run and we want it to be idempotent

        bprintf(&upgrade, "      -- resubscribe to %s\n\n", target_name);

        gen_sql_callbacks callbacks;
        init_gen_sql_callbacks(&callbacks);
        callbacks.col_def_callback = cg_suppress_col_def_by_version;
        callbacks.col_def_context = &note->version;
        callbacks.if_not_exists_callback = cg_schema_force_if_not_exists;
        callbacks.mode = gen_mode_sql;
        callbacks.long_to_int_conv = true;
        CHARBUF_OPEN(sql_out);
          gen_set_output_buffer(&sql_out);
          // only the columns as of the current version
          gen_statement_with_callbacks(note->target_ast, &callbacks);

          bindent(&upgrade, &sql_out, 6);
          bprintf(&upgrade, ";\n\n");
        CHARBUF_CLOSE(sql_out);

        // current status: subcribed
        note->target_ast->sem->sem_type &= sem_not(SCHEMA_FLAG_UNSUB);
        subscription_management = true;
        break;

      case SCHEMA_ANNOTATION_AD_HOC:
        // no annotation based actions other than migration proc (handled below)
        Contract(version_annotation->right);
        bprintf(&upgrade, "      -- ad hoc migration proc %s will run\n\n", target_name);
        break;
    }

    // handle any migration proc for any annotation
    if (!subscription_management && version_annotation->right) {
      // call any non-builtin migrations the generic way, builtins get whatever special handling they need
      if (!is_ast_dot(version_annotation->right)) {
        EXTRACT_STRING(proc, version_annotation->right);
        bprintf(&pending, "      IF cql_facet_find(%s_facets, '%s') = -1 THEN\n", global_proc_name, proc);
        bprintf(&pending, "        CALL %s();\n", proc);
        bprintf(&pending, "        CALL %s_cql_set_facet_version('%s', %d);\n", global_proc_name, proc, vers);
        bprintf(&pending, "      END IF;\n");
        bprintf(&decls, "DECLARE PROC %s() USING TRANSACTION;\n", proc);
      }
    }
  }

  cg_schema_end_version(&main, &upgrade, &pending, prev_version, &version_bits);

  if (drops.used > 1) {
    bprintf(&main, "    CALL %s_cql_drop_tables();\n", global_proc_name);

    bprintf(&preamble, "@attribute(cql:private)\n");
    bprintf(&preamble, "CREATE PROC %s_cql_drop_tables()\n", global_proc_name);
    bprintf(&preamble, "BEGIN\n");
    bprintf(&preamble, "%s", drops.ptr);
    bprintf(&preamble, "END;\n");
  }

  if (recreate_items_count) {
    bprintf(&main, "    CALL %s_cql_recreate_tables();\n", global_proc_name);
  }

  if (view_creates) {
    bprintf(&main, "    CALL %s_cql_create_all_views();\n", global_proc_name);
  }

  if (index_creates) {
    bprintf(&main, "    CALL %s_cql_create_all_indices();\n", global_proc_name);
  }

  if (trigger_creates) {
    bprintf(&main, "    CALL %s_cql_create_all_triggers();\n", global_proc_name);
  }

  CHARBUF_OPEN(missing_versions);
  for (uint32_t v = 1; v <= prev_version; v++) {
    uint32_t byteIndex = v / 8;
    uint32_t bitMask = 1 << (v % 8);
    if (version_bits.used > byteIndex && ((uint32_t)(version_bits.ptr[byteIndex]) & bitMask)) {
      continue;
    }

    if (missing_versions.used > 1) {
      bprintf(&missing_versions, ",");
    }
    bprintf(&missing_versions, "(%u)", v);
  }

  if (missing_versions.used > 1) {
    bprintf(&main, "    CALL %s_cleanup_unused_versions();\n", global_proc_name);

    bprintf(&preamble, "\n@attribute(cql:private)\n");
    bprintf(&preamble, "CREATE PROC %s_cleanup_unused_versions()\n", global_proc_name);
    bprintf(&preamble, "BEGIN\n");
    bprintf(&preamble, "  WITH\n");
    bprintf(&preamble, "    V(v) AS (VALUES %s),\n", missing_versions.ptr);
    bprintf(&preamble, "    F(f) AS (SELECT 'cql_schema_v'||v from V)\n");
    bprintf(&preamble, "  DELETE FROM %s_cql_schema_facets WHERE facet IN (SELECT f from F);\n", global_proc_name);
    bprintf(&preamble, "END;\n");
  }

  CHARBUF_CLOSE(missing_versions);

  bprintf(&main, "    CALL %s_cql_set_facet_version('cql_schema_version', %d);\n", global_proc_name, prev_version);
  bprintf(&main, "    CALL %s_cql_set_facet_version('cql_schema_crc', %lld);\n", global_proc_name, schema_crc);
  bprintf(&main, "END;\n\n");

  bprintf(&main, "CREATE PROCEDURE %s_get_current_and_proposed_versions(\n", global_proc_name);
  bprintf(&main, "    out current long not null,\n");
  bprintf(&main, "    out proposed long not null\n");
  bprintf(&main, "    )\n");
  bprintf(&main, "BEGIN\n");
  bprintf(&main, "    SET current := %s_cql_get_facet_version('cql_schema_version');\n", global_proc_name);
  bprintf(&main, "    SET proposed := %d;\n", max_schema_version);
  bprintf(&main, "END;\n");

  bprintf(&main, "@attribute(cql:private)\n");
  bprintf(&main, "CREATE PROCEDURE %s_perform_needed_upgrades()\n", global_proc_name);
  bprintf(&main, "BEGIN\n");
  bprintf(&main, "  -- check for downgrade --\n");
  bprintf(&main, "  IF cql_facet_find(%s_facets, 'cql_schema_version') > %d THEN\n", global_proc_name, max_schema_version);
  bprintf(&main, "    SELECT 'downgrade detected' facet;\n");
  bprintf(&main, "  ELSE\n");
  bprintf(&main, "    -- save the current facets so we can diff them later --\n");
  bprintf(&main, "    CALL %s_save_cql_schema_facets();\n", global_proc_name);
  bprintf(&main, "    CALL %s_perform_upgrade_steps();\n\n", global_proc_name);
  bprintf(&main, "    -- finally produce the list of differences\n");
  bprintf(&main, "    SELECT T1.facet FROM\n");
  bprintf(&main, "      %s_cql_schema_facets T1\n", global_proc_name);
  bprintf(&main, "      LEFT OUTER JOIN %s_cql_schema_facets_saved T2\n", global_proc_name);
  bprintf(&main, "        ON T1.facet = T2.facet\n", global_proc_name);
  bprintf(&main, "      WHERE T1.version is not T2.version;\n");
  bprintf(&main, "  END IF;\n");
  bprintf(&main, "END;\n\n");

  bprintf(&main, "CREATE PROCEDURE %s()\n", global_proc_name);
  bprintf(&main, "BEGIN\n");
  bprintf(&main, "  DECLARE schema_crc LONG INTEGER NOT NULL;\n");
  bprintf(&main, "\n");
  bprintf(&main, "  -- create schema facets information table --\n");
  bprintf(&main, "  CALL %s_create_cql_schema_facets_if_needed();\n\n", global_proc_name);
  bprintf(&main, "  -- fetch the last known schema crc, if it's different do the upgrade --\n");
  bprintf(&main, "  CALL %s_cql_get_facet_version('cql_schema_crc', schema_crc);\n\n", global_proc_name);
  bprintf(&main, "  IF schema_crc <> %lld THEN\n", (llint_t)schema_crc);
  bprintf(&main, "    BEGIN TRY\n");
  bprintf(&main, "      CALL %s_setup_facets();\n", global_proc_name);
  bprintf(&main, "      CALL %s_perform_needed_upgrades();\n", global_proc_name);
  bprintf(&main, "    END TRY;\n");
  bprintf(&main, "    BEGIN CATCH\n");
  bprintf(&main, "      CALL cql_facets_delete(%s_facets);\n", global_proc_name);
  bprintf(&main, "      SET %s_facets := 0;\n", global_proc_name);
  bprintf(&main, "      THROW;\n");
  bprintf(&main, "    END CATCH;\n");
  bprintf(&main, "    CALL cql_facets_delete(%s_facets);\n", global_proc_name);
  bprintf(&main, "    SET %s_facets := 0;\n", global_proc_name);
  bprintf(&main, "  ELSE\n");
  bprintf(&main, "    -- some canonical result for no differences --\n");
  bprintf(&main, "    SELECT 'no differences' facet;\n");
  bprintf(&main, "  END IF;\n");

  if (has_temp_schema) {
    bprintf(&main, "  ---- install temp schema after upgrade is complete ----\n");
    bprintf(&main, "  CALL %s_cql_install_temp_schema();\n\n", global_proc_name);
  }

  bprintf(&main, "END;\n\n");

  if (one_time_drop_needed) {
    cg_schema_emit_one_time_drop(&decls);
  }

  CHARBUF_OPEN(output_file);
  bprintf(&output_file, "%s\n", decls.ptr);
  bprintf(&output_file, "%s", preamble.ptr);
  bprintf(&output_file, "%s", main.ptr);

  cql_write_file(options.file_names[0], output_file.ptr);

  CHARBUF_CLOSE(output_file);

  CHARBUF_CLOSE(drops);
  CHARBUF_CLOSE(baseline);
  CHARBUF_CLOSE(upgrade);
  CHARBUF_CLOSE(pending);
  CHARBUF_CLOSE(decls);
  CHARBUF_CLOSE(main);
  CHARBUF_CLOSE(preamble);

  bytebuf_close(&version_bits);
}