shcore::Value Create_cluster::execute()

in modules/adminapi/dba/create_cluster.cc [748:1167]


shcore::Value Create_cluster::execute() {
  auto console = mysqlsh::current_console();
  console->print_info("Creating InnoDB Cluster '" + m_cluster_name + "' on '" +
                      m_target_instance->descr() + "'...\n");

  //  Common informative logging
  if (m_options.get_adopt_from_gr()) {
    log_info(
        "Adopting cluster from existing Group Replication and using its "
        "settings.");
  } else {
    log_used_gr_options();
  }

  // Make sure the GR plugin is installed (only installed if needed).
  // NOTE: An error is issued if it fails to be installed (e.g., DISABLED).
  //       Disable read-only temporarily to install the plugin if needed.
  if (!m_options.dry_run) {
    mysqlshdk::gr::install_group_replication_plugin(*m_target_instance,
                                                    nullptr);
  }

  // Reset GR member actions to avoid BUG#34464526 "Missing
  // mysql_start_failover_channels_if_primary after upgrade to 8.0.27+"
  //
  // When a Cluster is upgraded from 8.0.26 to 8.0.27+, every instance that is
  // taken out and rejoined back will have the list of available member
  // actions reduced to the ones available in 8.0.26 (the current primary).
  // In this particular scenario, the action
  // 'mysql_start_failover_channels_if_primary' is removed since it's only
  // available in 8.0.27+.
  //
  // As soon as the primary is taken out, upgraded, and rejoined, the member
  // actions available in the group are the "reduced" ones, meaning
  // 'mysql_start_failover_channels_if_primary' is not available.
  //
  // This results in an error while creating a Replica Cluster since that
  // action is configured and enabled. Resetting the member actions beforehand
  // ensures this issue is not seen.
  if (!m_options.dry_run && m_create_replica_cluster) {
    try {
      mysqlshdk::gr::reset_member_actions(*m_target_instance);
    } catch (...) {
      log_error("Error resetting Group Replication member actions at %s: %s",
                m_target_instance->descr().c_str(),
                format_active_exception().c_str());
      throw;
    }
  }

  if (*m_options.multi_primary && !m_options.get_adopt_from_gr()) {
    console->print_info(
        "The MySQL InnoDB Cluster is going to be setup in advanced "
        "Multi-Primary Mode. Consult its requirements and limitations in "
        "https://dev.mysql.com/doc/refman/en/"
        "group-replication-limitations.html");
  }

  // Get the current value of group_replication_transaction_size_limit
  int64_t original_transaction_size_limit =
      m_target_instance->get_sysvar_int(kGrTransactionSizeLimit).value_or(0);

  shcore::Scoped_callback_list undo_list;

  shcore::Value ret_val;
  try {
    console->print_info("Adding Seed Instance...");

    if (!m_options.get_adopt_from_gr()) {
      if (m_retrying && !m_options.dry_run) {
        // If we're retrying, make sure GR is stopped
        console->print_note("Stopping GR started by a previous failed attempt");
        mysqlshdk::gr::stop_group_replication(*m_target_instance);
        // stop GR will leave the server in SRO mode
        m_target_instance->set_sysvar("super_read_only", false);
      }

      // Set a guard to indicate that a createCluster() has started.
      // This has to happen before start GR, otherwise we can't tell whether
      // a running GR was started by us or the user.
      // This hack is used as an indicator that createCluster is running in an
      // instance If the instance already has master_user set to this value when
      // we enter createCluster(), it means a previous attempt failed.
      // Note: this shouldn't be set if we're adopting
      assert(!m_options.get_adopt_from_gr());
      mysqlshdk::mysql::create_indicator_tag(
          *m_target_instance, metadata::kClusterSetupIndicatorTag);

      // When using the communicationStack 'MySQL' the recovery channel must be
      // set-up before starting GR because GR requires that the recovery channel
      // is configured in order to obtain the credentials information
      if (m_options.gr_options.communication_stack.value_or("") ==
          kCommunicationStackMySQL) {
        // Create the Recovery account and configure GR's recovery channel.
        // If creating a Replica Cluster, suppress the binary log otherwise
        // we're creating an errant transaction in the Replica Cluster.
        // When a standalone Cluster we must not suppress the binary log
        // otherwise the transaction will never be replicated to the Cluster
        // members

        // Check if super_read_only is enabled. If so it must be disabled to
        // create the account. This might happen if super_read_only is persisted
        // and we're creating a Replica Cluster
        if (m_target_instance->get_sysvar_bool("super_read_only", false)) {
          m_target_instance->set_sysvar("super_read_only", false);
        }

        std::string repl_account;

        if (m_create_replica_cluster) {
          mysqlshdk::mysql::Suppress_binary_log nobinlog(
              m_target_instance.get());

          create_recovery_account(m_target_instance.get(),
                                  m_target_instance.get(), nullptr,
                                  &repl_account);
        } else {
          create_recovery_account(m_primary_master.get(),
                                  m_target_instance.get(), nullptr,
                                  &repl_account);
        }

        // Drop the recovery account if GR fails to start
        undo_list.push_front([=]() {
          std::string repl_account_host =
              m_options.replication_allowed_host.empty()
                  ? "%"
                  : m_options.replication_allowed_host;
          m_primary_master->drop_user(repl_account, repl_account_host);
        });
      }

      // GR has to be stopped if this fails
      undo_list.push_front([this, console]() {
        log_info("revert: Stopping group_replication");
        try {
          mysqlshdk::gr::stop_group_replication(*m_target_instance);
        } catch (const shcore::Error &e) {
          console->print_warning(
              "Could not stop group_replication while aborting changes: " +
              std::string(e.what()));
        }
        log_info("revert: Clearing super_read_only");
        try {
          m_target_instance->set_sysvar("super_read_only", false);
        } catch (const shcore::Error &e) {
          console->print_warning(
              "Could not clear super_read_only while aborting changes: " +
              std::string(e.what()));
        }
      });

      bool requires_certificates{false};
      switch (m_options.member_auth_options.member_auth_type) {
        case mysqlsh::dba::Replication_auth_type::CERT_ISSUER:
        case mysqlsh::dba::Replication_auth_type::CERT_SUBJECT:
        case mysqlsh::dba::Replication_auth_type::CERT_ISSUER_PASSWORD:
        case mysqlsh::dba::Replication_auth_type::CERT_SUBJECT_PASSWORD:
          requires_certificates = true;
        default:
          break;
      }

      // Start GR before creating the recovery user and metadata, so that
      // all transactions executed by us have the same GTID prefix
      // NOTE: When the communicationStack chosen is "MySQL", there's one
      // transaction that will have a different GTID prefix that is the "CREATE
      // USER" (see above)
      mysqlsh::dba::start_cluster(*m_target_instance, m_options.gr_options,
                                  requires_certificates,
                                  m_options.multi_primary, m_cfg.get());
    }

    std::string group_name = m_target_instance->get_group_name();

    undo_list.push_front([this]() { metadata::uninstall(m_target_instance); });

    std::shared_ptr<MetadataStorage> metadata;
    if (!m_metadata) {
      // Create the Metadata Schema.
      prepare_metadata_schema();

      // Open a separate metadata connection to not mix MD transactions and
      // other auto-committing operations on the same session

      auto ipool = current_ipool();

      auto inst =
          ipool->connect_unchecked(m_target_instance->get_connection_options());

      metadata = std::make_shared<MetadataStorage>(inst);
    } else {
      metadata = m_metadata;
    }

    // Create the cluster and insert it into the Metadata.

    std::string domain_name;
    std::string cluster_name;
    parse_fully_qualified_cluster_name(m_cluster_name, &domain_name, nullptr,
                                       &cluster_name);
    if (domain_name.empty()) domain_name = k_default_domain_name;

    mysqlshdk::gr::Topology_mode topology_mode =
        (*m_options.multi_primary)
            ? mysqlshdk::gr::Topology_mode::MULTI_PRIMARY
            : mysqlshdk::gr::Topology_mode::SINGLE_PRIMARY;

    auto cluster_impl = std::make_shared<Cluster_impl>(
        cluster_name, group_name, m_target_instance, m_primary_master, metadata,
        topology_mode);

    // Update the properties
    // For V1.0, let's see the Cluster's description to "default"
    cluster_impl->set_description("Default Cluster");

    // Insert Cluster (and ReplicaSet) on the Metadata Schema.
    MetadataStorage::Transaction trx(metadata);
    undo_list.push_front([&trx]() { trx.rollback(); });

    metadata->create_cluster_record(cluster_impl.get(),
                                    m_options.get_adopt_from_gr(), false);

    metadata->update_cluster_attribute(
        cluster_impl->get_id(), k_cluster_attribute_replication_allowed_host,
        m_options.replication_allowed_host.empty()
            ? shcore::Value("%")
            : shcore::Value(m_options.replication_allowed_host));

    // Store the communicationStack in the Metadata as a Cluster capability
    // TODO(miguel): build and add the list of allowed operations
    metadata->update_cluster_capability(
        cluster_impl->get_id(), kCommunicationStack,
        m_options.gr_options.communication_stack.value_or(""), {});

    metadata->update_cluster_attribute(
        cluster_impl->get_id(), k_cluster_attribute_assume_gtid_set_complete,
        m_options.clone_options.gtid_set_is_complete ? shcore::Value::True()
                                                     : shcore::Value::False());

    metadata->update_cluster_attribute(
        cluster_impl->get_id(), k_cluster_attribute_manual_start_on_boot,
        m_options.gr_options.manual_start_on_boot.value_or(false)
            ? shcore::Value::True()
            : shcore::Value::False());

    metadata->update_cluster_attribute(cluster_impl->get_id(),
                                       k_cluster_attribute_default,
                                       shcore::Value::True());

    if (m_options.gr_options.view_change_uuid) {
      metadata->update_cluster_attribute(
          cluster_impl->get_id(), "group_replication_view_change_uuid",
          shcore::Value(m_options.gr_options.view_change_uuid.value_or("")));
    }

    // Store in the Metadata the value of
    // group_replication_transaction_size_limit
    if (m_create_replica_cluster) {
      metadata->update_cluster_attribute(
          cluster_impl->get_id(), k_cluster_attribute_transaction_size_limit,
          shcore::Value(original_transaction_size_limit));
    } else {
      // Get and store the current value. If the option was used, the value was
      // already set by now so we're good
      metadata->update_cluster_attribute(
          cluster_impl->get_id(), k_cluster_attribute_transaction_size_limit,
          shcore::Value(
              m_target_instance->get_sysvar_int(kGrTransactionSizeLimit)
                  .value_or(0)));
    }

    // Insert instance into the metadata.
    if (m_options.get_adopt_from_gr()) {
      // update recovery auth attributes
      cluster_impl->get_metadata_storage()->update_cluster_attribute(
          cluster_impl->get_id(), k_cluster_attribute_member_auth_type,
          shcore::Value(
              to_string(m_options.member_auth_options.member_auth_type)));

      // Adoption from an existing GR group is performed after creating/updating
      // the metadata (since it is used internally by adopt_from_gr()).
      cluster_impl->adopt_from_gr();

      // Reset recovery channel in all instances to use our own account
      reset_recovery_all(cluster_impl.get());

      persist_sro_all(cluster_impl.get());
    } else {
      // Check if instance address already belong to cluster (metadata).
      // TODO(alfredo) - this check seems redundant?
      bool is_instance_on_md =
          cluster_impl->contains_instance_with_address(m_address_in_metadata);

      log_debug("Cluster %s: Instance '%s' %s",
                cluster_impl->get_name().c_str(),
                m_target_instance->descr().c_str(),
                is_instance_on_md ? "is already in the Metadata."
                                  : "is being added to the Metadata...");

      // If the instance is not in the Metadata, we must add it.
      if (!is_instance_on_md) {
        cluster_impl->add_metadata_for_instance(
            m_target_instance->get_connection_options());
      }

      // update recovery auth attributes
      cluster_impl->get_metadata_storage()->update_cluster_attribute(
          cluster_impl->get_id(), k_cluster_attribute_member_auth_type,
          shcore::Value(
              to_string(m_options.member_auth_options.member_auth_type)));
      cluster_impl->get_metadata_storage()->update_cluster_attribute(
          cluster_impl->get_id(), k_cluster_attribute_cert_issuer,
          shcore::Value(m_options.member_auth_options.cert_issuer));

      cluster_impl->get_metadata_storage()->update_instance_attribute(
          m_target_instance->get_uuid(), k_instance_attribute_cert_subject,
          shcore::Value(m_options.member_auth_options.cert_subject));

      // Create the recovery account and update the Metadata
      // NOTE: If the Cluster is a REPLICA the account must be created at the
      // PRIMARY Cluster too
      if (m_create_replica_cluster ||
          !m_options.gr_options.communication_stack.has_value() ||
          (m_options.gr_options.communication_stack.value() ==
           kCommunicationStackXCom)) {
        std::string repl_account;
        create_recovery_account(m_target_instance.get(),
                                m_target_instance.get(), cluster_impl.get(),
                                &repl_account);

        // Drop the recovery account if GR fails to start
        undo_list.push_front([=]() {
          std::string repl_account_host =
              m_options.replication_allowed_host.empty()
                  ? "%"
                  : m_options.replication_allowed_host;
          m_primary_master->drop_user(repl_account, repl_account_host);
        });

        store_recovery_account_metadata(m_target_instance.get(), *cluster_impl);
      } else {
        // The recovery account was already created, just updated the Metadata:
        // Standalone cluster using MySQL comm stack
        store_recovery_account_metadata(m_target_instance.get(), *cluster_impl);
      }
    }

    // Handle the clone options
    {
      // If disableClone: true, uninstall the plugin and update the Metadata
      if (m_options.clone_options.disable_clone.value_or(false)) {
        mysqlshdk::mysql::uninstall_clone_plugin(*m_target_instance, nullptr);
        cluster_impl->set_disable_clone_option(
            *m_options.clone_options.disable_clone);
      } else {
        // Install the plugin if the target instance is >= 8.0.17
        if (supports_mysql_clone(m_target_instance->get_version())) {
          // Try to install the plugin, if it fails or it's disabled, then
          // disable the clone usage on the cluster but proceed
          try {
            mysqlshdk::mysql::install_clone_plugin(*m_target_instance, nullptr);
          } catch (const std::exception &e) {
            console->print_warning(
                "Failed to install/activate the clone plugin: " +
                std::string(e.what()));
            console->print_info("Disabling the clone usage on the cluster.");
            console->print_info();

            cluster_impl->set_disable_clone_option(true);
          }
        } else {
          // If the target instance is < 8.0.17 it does not support clone, so we
          // must disable it by default
          cluster_impl->set_disable_clone_option(true);
        }
      }
    }

    // Everything after this should not affect the result of the createCluster()
    // even if an exception is thrown.
    trx.commit();
    undo_list.cancel();

    // If it reaches here, it means there are no exceptions
    std::shared_ptr<Cluster> cluster = std::make_shared<Cluster>(cluster_impl);

    ret_val =
        shcore::Value(std::static_pointer_cast<shcore::Object_bridge>(cluster));
  } catch (...) {
    try {
      log_info("createCluster() failed: Trying to revert changes...");

      undo_list.call();

      mysqlshdk::mysql::drop_indicator_tag(*m_target_instance,
                                           metadata::kClusterSetupIndicatorTag);
    } catch (const std::exception &e) {
      console->print_error("Error aborting changes: " + std::string(e.what()));
    }
    throw;
  }

  mysqlshdk::mysql::drop_indicator_tag(*m_target_instance,
                                       metadata::kClusterSetupIndicatorTag);

  std::string message =
      m_options.get_adopt_from_gr()
          ? "Cluster successfully created based on existing replication group."
          : "Cluster successfully created. Use Cluster.<<<addInstance>>>() to "
            "add MySQL instances.\nAt least 3 instances are needed for the "
            "cluster to be able to withstand up to\none server failure.";
  console->print_info(message);
  console->print_info();

  // we can already release the locks on the instances
  m_instance_locks.invoke();

  return ret_val;
}