in backend/query/pg_catalog.cc [1852:2174]
void PGCatalog::FillPGProcTable() {
auto pg_proc = tables_by_name_.at(kPGProc).get();
std::vector<std::vector<zetasql::Value>> rows;
absl::flat_hash_set<const PostgresExtendedFunction*> functions;
auto status = system_catalog_->GetPostgreSQLFunctions(&functions);
if (!status.ok()) {
ZETASQL_VLOG(1) << "Failed to get table-valued functions: " << status;
return;
}
for (const PostgresExtendedFunction* function : functions) {
for (const auto& signature : function->GetPostgresSignatures()) {
auto proc_metadata = GetPgProcDataFromBootstrap(
PgBootstrapCatalog::Default(), signature->postgres_proc_oid());
std::vector<zetasql::Value> proargtypes;
for (const auto& argtype : proc_metadata->proargtypes()) {
proargtypes.push_back(CreatePgOidValue(argtype).value());
}
rows.push_back({
// oid
CreatePgOidValue(proc_metadata->oid()).value(),
// proname
String(proc_metadata->proname()),
// pronamespace
CreatePgOidValue(proc_metadata->pronamespace()).value(),
// proowner
NullPgOid(),
// prolang
NullPgOid(),
// procost
NullDouble(),
// prorows
NullDouble(),
// provariadic
CreatePgOidValue(proc_metadata->provariadic()).value(),
// prokind
String(proc_metadata->prokind()),
// prosecdef
NullBool(),
// proleakproof
NullBool(),
// proisstrict
NullBool(),
// proretset
Bool(proc_metadata->proretset()),
// provolatile
NullString(),
// proparallel
NullString(),
// pronargs
Int64(proc_metadata->pronargs()),
// pronargdefaults
Int64(proc_metadata->pronargdefaults()),
// prorettype
CreatePgOidValue(proc_metadata->prorettype()).value(),
// proargtypes
zetasql::Value::MakeArray(GetPgOidArrayType(), proargtypes).value(),
// proallargtypes
Null(GetPgOidArrayType()),
// proargmodes
Null(StringArrayType()),
// proargnames
Null(StringArrayType()),
// proargdefaults
NullString(),
// protrftypes
Null(GetPgOidArrayType()),
// prosrc
NullString(),
// probin
NullString(),
// prosqlbody
String(proc_metadata->prosqlbody()),
// proconfig
Null(StringArrayType()),
});
}
}
absl::flat_hash_map<Oid, const zetasql::TableValuedFunction*> tvfs;
status = system_catalog_->GetTableValuedFunctions(&tvfs);
if (!status.ok()) {
ZETASQL_VLOG(1) << "Failed to get table-valued functions: " << status;
return;
}
for (const auto& [tvf_oid, tvf] : tvfs) {
auto signature = tvf->GetSignature(0);
std::vector<zetasql::Value> proargtypes;
int variadic_type_oid = 0;
if (!signature->result_type().IsRelation()) {
ZETASQL_VLOG(1) << "Table-valued functions must return a relation type.";
continue;
}
auto& rettype_options = signature->result_type().options();
if (rettype_options.has_relation_input_schema() &&
rettype_options.relation_input_schema().num_columns() != 1) {
ZETASQL_VLOG(1) << "Table-valued functions must return a relation type "
<< "with exactly one column.";
continue;
}
auto rettype_mapping = system_catalog_->GetTypeFromReverseMapping(
rettype_options.relation_input_schema().column(0).type);
if (rettype_mapping == nullptr) {
ZETASQL_VLOG(1) << "Failed to get type mapping for "
<< signature->result_type().DebugString();
continue;
}
int rettype_oid = rettype_mapping->PostgresTypeOid();
for (const auto& arg : signature->arguments()) {
auto type_mapping =
system_catalog_->GetTypeFromReverseMapping(arg.type());
if (type_mapping == nullptr) {
ZETASQL_VLOG(1) << "Failed to get type mapping for "
<< arg.type()->DebugString();
continue;
}
if (arg.repeated()) {
variadic_type_oid = type_mapping->PostgresTypeOid();
}
proargtypes.push_back(
CreatePgOidValue(type_mapping->PostgresTypeOid()).value());
}
rows.push_back({
// oid
CreatePgOidValue(tvf_oid).value(),
// proname
String(tvf->Name()),
// pronamespace
CreatePgOidValue(50000).value(),
// proowner
NullPgOid(),
// prolang
NullPgOid(),
// procost
NullDouble(),
// prorows
NullDouble(),
// provariadic
CreatePgOidValue(variadic_type_oid).value(),
// prokind
String("f"),
// prosecdef
NullBool(),
// proleakproof
NullBool(),
// proisstrict
NullBool(),
// proretset
Bool(true),
// provolatile
NullString(),
// proparallel
NullString(),
// pronargs
Int64(signature->arguments().size()),
// pronargdefaults
Int64(0),
// prorettype
CreatePgOidValue(rettype_oid).value(),
// proargtypes
zetasql::Value::MakeArray(GetPgOidArrayType(), proargtypes).value(),
// proallargtypes
Null(GetPgOidArrayType()),
// proargmodes
Null(StringArrayType()),
// proargnames
Null(StringArrayType()),
// proargdefaults
NullString(),
// protrftypes
Null(GetPgOidArrayType()),
// prosrc
NullString(),
// probin
NullString(),
// prosqlbody
NullString(),
// proconfig
Null(StringArrayType()),
});
}
absl::flat_hash_set<const zetasql::TableValuedFunction*> changestream_tvfs;
status = root_catalog_->GetTableValuedFunctions(&changestream_tvfs);
if (!status.ok()) {
ZETASQL_VLOG(1) << "Failed to get table-valued functions from root catalog: "
<< status;
return;
}
absl::flat_hash_map<std::string, const zetasql::TableValuedFunction*>
changestream_tvfs_map;
for (const zetasql::TableValuedFunction* tvf : changestream_tvfs) {
changestream_tvfs_map[tvf->Name()] = tvf;
}
for (const ChangeStream* change_stream : default_schema_->change_streams()) {
if (!changestream_tvfs_map.contains(change_stream->tvf_name())) {
ZETASQL_VLOG(1) << "Change stream " << change_stream->Name()
<< " does not have a table-valued function.";
continue;
}
if (!change_stream->tvf_postgresql_oid().has_value()) {
ZETASQL_VLOG(1) << "Change stream TVF " << change_stream->Name()
<< " does not have a PostgreSQL OID.";
continue;
}
const auto& [change_stream_schema, change_stream_name] =
GetSchemaAndNameForPGCatalog(change_stream->Name());
int namespace_oid = 0;
if (kHardCodedNamedSchemaOid.contains(change_stream_schema)) {
namespace_oid = kHardCodedNamedSchemaOid.at(change_stream_schema);
} else {
const NamedSchema* named_schema =
default_schema_->FindNamedSchema(change_stream_schema);
if (!named_schema->postgresql_oid().has_value()) {
ZETASQL_VLOG(1) << "Named schema " << change_stream_schema
<< " does not have a PostgreSQL OID.";
continue;
}
namespace_oid = named_schema->postgresql_oid().value();
}
auto signature =
changestream_tvfs_map[change_stream->tvf_name()]->GetSignature(0);
std::vector<zetasql::Value> proargtypes;
int variadic_type_oid = 0;
if (!signature->result_type().IsRelation()) {
ZETASQL_VLOG(1) << "Table-valued functions must return a relation type.";
continue;
}
auto& rettype_options = signature->result_type().options();
if (rettype_options.has_relation_input_schema() &&
rettype_options.relation_input_schema().num_columns() != 1) {
ZETASQL_VLOG(1) << "Table-valued functions must return a relation type "
<< "with exactly one column.";
continue;
}
auto rettype_mapping = system_catalog_->GetTypeFromReverseMapping(
rettype_options.relation_input_schema().column(0).type);
if (rettype_mapping == nullptr) {
ZETASQL_VLOG(1) << "Failed to get type mapping for "
<< signature->result_type().DebugString();
continue;
}
int rettype_oid = rettype_mapping->PostgresTypeOid();
for (const auto& arg : signature->arguments()) {
auto type_mapping =
system_catalog_->GetTypeFromReverseMapping(arg.type());
if (type_mapping == nullptr) {
ZETASQL_VLOG(1) << "Failed to get type mapping for "
<< arg.type()->DebugString();
continue;
} else if (type_mapping->PostgresTypeOid() == TEXTOID) {
type_mapping = types::PgVarcharMapping();
} else if (type_mapping->PostgresTypeOid() == TEXTARRAYOID) {
type_mapping = types::PgVarcharArrayMapping();
}
if (arg.repeated()) {
variadic_type_oid = type_mapping->PostgresTypeOid();
}
proargtypes.push_back(
CreatePgOidValue(type_mapping->PostgresTypeOid()).value());
}
rows.push_back({
// oid
CreatePgOidValue(change_stream->tvf_postgresql_oid().value()).value(),
// proname
String(change_stream->tvf_name()),
// pronamespace
CreatePgOidValue(namespace_oid).value(),
// proowner
NullPgOid(),
// prolang
NullPgOid(),
// procost
NullDouble(),
// prorows
NullDouble(),
// provariadic
CreatePgOidValue(variadic_type_oid).value(),
// prokind
String("f"),
// prosecdef
NullBool(),
// proleakproof
NullBool(),
// proisstrict
NullBool(),
// proretset
Bool(true),
// provolatile
NullString(),
// proparallel
NullString(),
// pronargs
Int64(signature->arguments().size()),
// pronargdefaults
Int64(0),
// prorettype
CreatePgOidValue(rettype_oid).value(),
// proargtypes
zetasql::Value::MakeArray(GetPgOidArrayType(), proargtypes).value(),
// proallargtypes
Null(GetPgOidArrayType()),
// proargmodes
Null(StringArrayType()),
// proargnames
Null(StringArrayType()),
// proargdefaults
NullString(),
// protrftypes
Null(GetPgOidArrayType()),
// prosrc
NullString(),
// probin
NullString(),
// prosqlbody
NullString(),
// proconfig
Null(StringArrayType()),
});
}
pg_proc->SetContents(rows);
}