void InformationSchemaCatalog::FillKeyColumnUsageTable()

in backend/query/information_schema_catalog.cc [2012:2147]


void InformationSchemaCatalog::FillKeyColumnUsageTable() {
  auto key_column_usage =
      tables_by_name_.at(GetNameForDialect(kKeyColumnUsage)).get();

  std::vector<std::vector<zetasql::Value>> rows;
  for (const auto* table : default_schema_->tables()) {
    const auto& [table_schema_part, table_name_part] =
        GetSchemaAndNameForInformationSchema(table->Name());

    // Add the primary key columns.
    int table_ordinal = 1;
    for (const auto* key_column : table->primary_key()) {
      rows.push_back({
          // constraint_catalog
          DialectTableCatalog(),
          // constraint_schema
          String(table_schema_part),
          // constraint_name
          String(PrimaryKeyName(table)),
          // table_catalog
          DialectTableCatalog(),
          // table_schema
          String(table_schema_part),
          // table_name
          String(table_name_part),
          // column_name
          String(key_column->column()->Name()),
          // ordinal_position
          Int64(table_ordinal++),
          // position_in_unique_constraint
          NullString(),
      });
    }

    // Add the foreign keys.
    for (const auto* foreign_key : table->foreign_keys()) {
      // Add the foreign key referencing columns.
      int foreign_key_ordinal = 1;
      for (const auto* column : foreign_key->referencing_columns()) {
        rows.push_back({
            // constraint_catalog
            DialectTableCatalog(),
            // constraint_schema
            String(table_schema_part),
            // constraint_name
            String(SDLObjectName::GetInSchemaName(foreign_key->Name())),
            // table_catalog
            DialectTableCatalog(),
            // table_schema
            String(table_schema_part),
            // table_name
            String(table_name_part),
            // column_name
            String(column->Name()),
            // ordinal_position
            Int64(foreign_key_ordinal),
            // position_in_unique_constraint
            Int64(foreign_key_ordinal),
        });
        ++foreign_key_ordinal;
      }

      // Add the foreign key's unique backing index columns.
      if (foreign_key->referenced_index()) {
        const auto& [referenced_table_schema_part, referenced_table_name_part] =
            GetSchemaAndNameForInformationSchema(
                foreign_key->referenced_table()->Name());
        int index_ordinal = 1;
        for (const auto* key_column :
             foreign_key->referenced_index()->key_columns()) {
          rows.push_back({
              // constraint_catalog
              DialectTableCatalog(),
              // constraint_schema
              String(table_schema_part),
              // constraint_name
              String(SDLObjectName::GetInSchemaName(
                  foreign_key->referenced_index()->Name())),
              // table_catalog
              DialectTableCatalog(),
              // table_schema
              String(referenced_table_schema_part),
              // table_name
              String(referenced_table_name_part),
              // column_name
              String(key_column->column()->Name()),
              // ordinal_position
              Int64(index_ordinal++),
              // position_in_unique_constraint
              NullInt64(),
          });
        }
      }
    }
  }

  // Production doesn't add check constraints for the information schema so we
  // also don't add it in the emulator.
  if (dialect_ != DatabaseDialect::POSTGRESQL) {
    // Add the information schema primary key columns.
    for (const auto& table : this->tables()) {
      int primary_key_ordinal = 1;
      for (int i = 0; i < table->NumColumns(); ++i) {
        const auto* column = table->GetColumn(i);
        const auto* metadata = FindKeyColumnMetadata(dialect_, table, column);
        if (metadata == nullptr) {
          continue;  // Not a primary key column.
        }
        rows.push_back({
            // constraint_catalog
            DialectTableCatalog(),
            // constraint_schema
            String(kInformationSchema),
            // constraint_name
            String(PrimaryKeyName(table)),
            // table_catalog
            DialectTableCatalog(),
            // table_schema
            String(kInformationSchema),
            // table_name
            String(table->Name()),
            // column_name
            String(metadata->column_name),
            // ordinal_position
            Int64(metadata->primary_key_ordinal > 0
                      ? metadata->primary_key_ordinal
                      : primary_key_ordinal++),
            // position_in_unique_constraint
            NullString(),
        });
      }
    }
  }

  key_column_usage->SetContents(rows);
}