absl::Status SchemaUpdaterImpl::SetColumnDefinition()

in backend/schema/updater/schema_updater.cc [1619:1856]


absl::Status SchemaUpdaterImpl::SetColumnDefinition(
    const ddl::ColumnDefinition& ddl_column, const Table* table,
    const ddl::CreateTable* ddl_create_table,
    const database_api::DatabaseDialect& dialect, bool is_alter,
    ColumnDefModifer* modifier) {
  bool is_generated = false;
  bool has_default_value = false;
  bool is_identity_column = false;
  // Process any changes in column definition.
  ZETASQL_ASSIGN_OR_RETURN(
      const zetasql::Type* column_type,
      DDLColumnTypeToGoogleSqlType(ddl_column, type_factory_,
                                   latest_schema_->proto_bundle().get()));
  modifier->set_type(column_type);

  if (column_type->IsTokenList() && !ddl_column.hidden()) {
    return error::NonHiddenTokenlistColumn(table->Name(),
                                           ddl_column.column_name());
  }

  // For the case of removing a vector length param in ALTER TABLE ALTER COLUMN.
  if (ddl_create_table == nullptr) {
    const Column* column = table->FindColumn(ddl_column.column_name());
    if (column != nullptr && column->has_vector_length() &&
        !ddl_column.has_vector_length()) {
      return error::CannotAlterColumnToRemoveVectorLength(
          ddl_column.column_name());
    }
  }

  // Do not allow a column to convert to and stop being an identity column.
  const Column* old_column = table->FindColumn(ddl_column.column_name());
  if (old_column != nullptr &&
      old_column->is_identity_column() != ddl_column.has_identity_column()) {
    if (ddl_column.has_identity_column()) {
      return error::CannotAlterToIdentityColumn(table->Name(),
                                                ddl_column.column_name());
    } else {
      return error::CannotAlterColumnToDropIdentity(table->Name(),
                                                    ddl_column.column_name());
    }
  }

  absl::flat_hash_set<const SchemaNode*> udf_dependencies;
  absl::flat_hash_set<const SchemaNode*> dependent_sequences;
  if (ddl_column.has_column_default()) {
    std::string expression = ddl_column.column_default().expression();
    if (dialect == database_api::DatabaseDialect::POSTGRESQL) {
      ZETASQL_ASSIGN_OR_RETURN(expression,
                       TranslatePGExpression(ddl_column.column_default(), table,
                                             ddl_create_table, *modifier));
    }
    has_default_value = true;
    modifier->set_expression(expression);

    absl::Status s = AnalyzeColumnDefaultValue(
        expression, ddl_column.column_name(), column_type, table,
        ddl_create_table, &dependent_sequences, &udf_dependencies);

    if (!s.ok()) {
      return error::ColumnDefaultValueParseError(
          table->Name(), ddl_column.column_name(), s.message());
    }
  } else if (ddl_column.has_identity_column()) {
    is_identity_column = true;
    // The default value is the expression
    // `GET_NEXT_SEQUENCE_VALUE(sequence_name)`
    has_default_value = true;
    std::vector<std::string> parts = absl::StrSplit(modifier->get()->id(), ':');
    ZETASQL_RET_CHECK_GE(parts.size(), 2);

    std::string sequence_name = absl::StrFormat(
        "_identity_seq_%s", absl::StrReplaceAll(parts[0], {{".", "__"}}));
    const Sequence* existing_sequence =
        latest_schema_->FindSequence(sequence_name);
    if (is_alter) {
      ZETASQL_RET_CHECK(existing_sequence != nullptr)
          << "sequence does not exist: " << sequence_name;
      ddl::AlterSequence alter_sequence;
      alter_sequence.set_sequence_name(sequence_name);
      SetSequenceOptionsForIdentityColumn(
          ddl_column.identity_column(),
          alter_sequence.mutable_set_options()->mutable_options());
      ZETASQL_RETURN_IF_ERROR(AlterSequence(alter_sequence, existing_sequence));
      dependent_sequences.insert(existing_sequence);
    } else {
      ZETASQL_RET_CHECK(existing_sequence == nullptr)
          << "sequence already exists: " << sequence_name;
      // Create the internal sequence.
      ddl::CreateSequence create_sequence;
      create_sequence.set_sequence_name(sequence_name);
      if (ddl_column.identity_column().has_type() &&
          ddl_column.identity_column().type() ==
              ddl::ColumnDefinition::IdentityColumnDefinition::
                  BIT_REVERSED_POSITIVE) {
        ddl::SetOption* sequence_kind = create_sequence.add_set_options();
        sequence_kind->set_option_name("sequence_kind");
        sequence_kind->set_string_value("bit_reversed_positive");
      } else if (!IsDefaultSequenceKindSet()) {
        return error::UnspecifiedIdentityColumnSequenceKind(
            ddl_column.column_name());
      }
      SetSequenceOptionsForIdentityColumn(
          ddl_column.identity_column(), create_sequence.mutable_set_options());
      ZETASQL_ASSIGN_OR_RETURN(
          const Sequence* sequence,
          CreateSequence(create_sequence, dialect, /*is_internal_use=*/true));

      std::string expression;
      if (dialect == database_api::DatabaseDialect::POSTGRESQL) {
        expression =
            absl::StrFormat("(GET_NEXT_SEQUENCE_VALUE(\"%s\"))", sequence_name);
      } else {
        expression = absl::StrFormat("(GET_NEXT_SEQUENCE_VALUE(SEQUENCE %s))",
                                     sequence_name);
      }
      modifier->set_expression(expression);
      dependent_sequences.insert(sequence);
    }
  } else if (ddl_column.has_generated_column()) {
    std::string expression = ddl_column.generated_column().expression();

    if (dialect == database_api::DatabaseDialect::POSTGRESQL) {
      ZETASQL_ASSIGN_OR_RETURN(expression, TranslatePGExpression(
                                       ddl_column.generated_column(), table,
                                       ddl_create_table, *modifier));
    }
    is_generated = true;
    modifier->set_expression(expression);
    absl::flat_hash_set<std::string> dependent_column_names;
    absl::Status s = AnalyzeGeneratedColumn(
        expression, ddl_column.column_name(), column_type, table,
        ddl_create_table, &dependent_column_names, &udf_dependencies);
    if (!s.ok()) {
      return error::GeneratedColumnDefinitionParseError(
          table->Name(), ddl_column.column_name(), s.message());
    }
    // Create a helper map to check if a column is generated.
    absl::flat_hash_set<std::string> generated_column_set;
    if (dialect == database_api::DatabaseDialect::POSTGRESQL) {
      if (ddl_create_table != nullptr) {
        for (const ddl::ColumnDefinition& column_def :
             ddl_create_table->column()) {
          if (column_def.has_generated_column()) {
            generated_column_set.insert(column_def.column_name());
          }
        }
      } else {
        // This is for altering a table definition.
        for (const Column* column : table->columns()) {
          if (column->is_generated()) {
            generated_column_set.insert(column->Name());
          }
        }
      }
    }
    for (const std::string& column_name : dependent_column_names) {
      if (dialect == database_api::DatabaseDialect::POSTGRESQL &&
          generated_column_set.contains(column_name)) {
        // Check generated column does not reference to generated for
        // PostgreSQL schema.
        return error::DdlInvalidArgumentError(
            absl::Substitute("A generated column \"$0\" cannot reference "
                             "another generated column \"$1\".",
                             ddl_column.column_name(), column_name));
      }
      modifier->add_dependent_column_name(column_name);
    }
    modifier->set_stored(ddl_column.generated_column().stored());
  }

  if (!is_generated && !has_default_value) {
    // Altering a generated column to a non-generated column is disallowed. In
    // that case, the expression is cleared here and later validation at
    // column_validator.cc will block it.
    modifier->clear_expression();
  } else {
    ZETASQL_RET_CHECK(is_generated != has_default_value);
  }

  modifier->set_is_identity_column(is_identity_column);
  modifier->set_has_default_value(has_default_value);
  // Set the default values for nullability and length.
  modifier->set_nullable(!ddl_column.not_null());
  modifier->set_declared_max_length(std::nullopt);
  if (ddl_column.has_length()) {
    modifier->set_declared_max_length(ddl_column.length());
  } else if (ddl_column.type() == ddl::ColumnDefinition::ARRAY &&
             ddl_column.has_array_subtype() &&
             ddl_column.array_subtype().has_length()) {
    modifier->set_declared_max_length(ddl_column.array_subtype().length());
  }
  modifier->set_sequences_used(dependent_sequences);
  modifier->set_udf_dependencies(udf_dependencies);

  if (ddl_column.has_vector_length()) {
    // For the case of adding `vector_length` param in CREATE TABLE and ALTER
    // TABLE ADD COLUMN.
    if (ddl_create_table != nullptr ||
        (ddl_create_table == nullptr &&
         table->FindColumn(ddl_column.column_name()) == nullptr)) {
      modifier->set_vector_length(ddl_column.vector_length());
    } else {
      // For the case of adding or editing `vector_length` param in ALTER TABLE
      // ALTER COLUMN.
      return error::CannotAlterColumnToAddVectorLength(
          ddl_column.column_name());
    }
  }
  if (!ddl_column.set_options().empty()) {
    ZETASQL_RETURN_IF_ERROR(
        SetColumnOptions(ddl_column.set_options(), dialect, modifier));
  }

  if (is_alter) {
    const Column* existing_column = table->FindColumn(ddl_column.column_name());
    if (existing_column == nullptr) {
      return error::ColumnNotFound(table->Name(), ddl_column.column_name());
    }
    absl::flat_hash_set<const SchemaNode*> deps;
    for (const auto& dep : existing_column->udf_dependencies()) {
      deps.insert(dep);
    }
    // Check for a recursive columns by analyzing the transitive set of
    // dependencies, i.e., if the view is a dependency of itself.
    auto transitive_deps = GatherTransitiveDependenciesForSchemaNode(deps);
    if (std::find_if(transitive_deps.begin(), transitive_deps.end(),
                     [existing_column](const SchemaNode* dep) {
                       return (dep->As<const Column>() != nullptr &&
                               dep->As<const Column>()->Name() ==
                                   existing_column->Name());
                     }) != transitive_deps.end()) {
      return error::ViewReplaceRecursive(existing_column->Name());
    }
  }

  return absl::OkStatus();
}