void InformationSchemaCatalog::FillConstraintColumnUsageTable()

in backend/query/information_schema_catalog.cc [2159:2343]


void InformationSchemaCatalog::FillConstraintColumnUsageTable() {
  auto constraint_column_usage =
      tables_by_name_.at(GetNameForDialect(kConstraintColumnUsage)).get();

  std::vector<std::vector<zetasql::Value>> rows;
  for (const auto* table : default_schema_->tables()) {
    const auto& [table_schema_part, table_name_part] =
        GetSchemaAndNameForInformationSchema(table->Name());
    // Add the primary key columns.
    for (const auto* key_column : table->primary_key()) {
      rows.push_back({
          // table_catalog
          DialectTableCatalog(),
          // table_schema
          String(table_schema_part),
          // table_name
          String(table_name_part),
          // column_name
          String(key_column->column()->Name()),
          // constraint_catalog
          DialectTableCatalog(),
          // constraint_schema
          String(table_schema_part),
          // constraint_name
          String(PrimaryKeyName(table)),
      });
    }

    // Add the NOT NULL check constraints.
    for (const auto* column : table->columns()) {
      if (column->is_nullable()) {
        continue;
      }
      rows.push_back({
          // table_catalog
          DialectTableCatalog(),
          // table_schema
          String(table_schema_part),
          // table_name
          String(table_name_part),
          // column_name
          String(column->Name()),
          // constraint_catalog
          DialectTableCatalog(),
          // constraint_schema
          String(table_schema_part),
          // constraint_name
          String(CheckNotNullName(table, column)),
      });
    }

    // Add the check constraints defined by the ZETASQL_VLOG keyword.
    for (const auto* check_constraint : table->check_constraints()) {
      for (const auto* dep_column : check_constraint->dependent_columns()) {
        rows.push_back({
            // table_catalog
            DialectTableCatalog(),
            // table_schema
            String(table_schema_part),
            // table_name
            String(table_name_part),
            // column_name
            String(dep_column->Name()),
            // constraint_catalog
            DialectTableCatalog(),
            // constraint_schema
            String(table_schema_part),
            // constraint_name
            String(SDLObjectName::GetInSchemaName(check_constraint->Name())),
        });
      }
    }

    // Add the foreign keys.
    for (const auto* foreign_key : table->foreign_keys()) {
      const auto& [referenced_table_schema_part, referenced_table_name_part] =
          GetSchemaAndNameForInformationSchema(
              foreign_key->referenced_table()->Name());

      // Add the foreign key referenced columns.
      for (const auto* column : foreign_key->referenced_columns()) {
        rows.push_back({
            // table_catalog
            DialectTableCatalog(),
            // table_schema
            String(referenced_table_schema_part),
            // table_name
            String(referenced_table_name_part),
            // column_name
            String(column->Name()),
            // constraint_catalog
            DialectTableCatalog(),
            // constraint_schema
            String(table_schema_part),
            // constraint_name
            String(SDLObjectName::GetInSchemaName(foreign_key->Name())),
        });
      }

      // Add the foreign key's unique backing index columns.
      if (foreign_key->referenced_index()) {
        for (const auto* key_column :
             foreign_key->referenced_index()->key_columns()) {
          rows.push_back({
              // table_catalog
              DialectTableCatalog(),
              // table_schema
              String(referenced_table_schema_part),
              // table_name
              String(referenced_table_name_part),
              // column_name
              String(key_column->column()->Name()),
              // constraint_catalog
              DialectTableCatalog(),
              // constraint_schema
              String(table_schema_part),
              // constraint_name
              String(SDLObjectName::GetInSchemaName(
                  foreign_key->referenced_index()->Name())),
          });
        }
      }
    }
  }

  // Production doesn't add constraint column usage for the information schema
  // so we also don't add it in the emulator.
  if (dialect_ != DatabaseDialect::POSTGRESQL) {
    // Add the information schema primary key columns.
    for (const auto& table : this->tables()) {
      for (int i = 0; i < table->NumColumns(); ++i) {
        const auto* column = table->GetColumn(i);
        const auto* metadata = FindKeyColumnMetadata(dialect_, table, column);
        if (metadata == nullptr) {
          continue;  // Not a primary key column.
        }
        rows.push_back({
            // table_catalog
            DialectTableCatalog(),
            // table_schema
            String(kInformationSchema),
            // table_name
            String(table->Name()),
            // column_name
            String(metadata->column_name),
            // constraint_catalog
            DialectTableCatalog(),
            // constraint_schema
            String(kInformationSchema),
            // constraint_name
            String(PrimaryKeyName(table)),
        });
      }
    }

    // Add the information schema NOT NULL check constraints.
    for (const auto& table : this->tables()) {
      for (int i = 0; i < table->NumColumns(); ++i) {
        const auto* column = table->GetColumn(i);
        const auto& metadata = GetColumnMetadata(dialect_, table, column);
        if (IsNullable(metadata)) {
          continue;
        }
        rows.push_back({
            // table_catalog
            DialectTableCatalog(),
            // table_schema
            String(kInformationSchema),
            // table_name
            String(table->Name()),
            // column_name
            String(metadata.column_name),
            // constraint_catalog
            DialectTableCatalog(),
            // constraint_schema
            String(kInformationSchema),
            // constraint_name
            String(CheckNotNullName(table, column)),
        });
      }
    }
  }

  constraint_column_usage->SetContents(rows);
}