backend/query/catalog.cc (537 lines of code) (raw):
//
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include "backend/query/catalog.h"
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include <vector>
#include "zetasql/public/catalog.h"
#include "zetasql/public/function.h"
#include "zetasql/public/function_signature.h"
#include "zetasql/public/property_graph.h"
#include "zetasql/public/types/type.h"
#include "zetasql/public/types/type_factory.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "absl/log/check.h"
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/ascii.h"
#include "absl/strings/match.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "absl/strings/string_view.h"
#include "absl/synchronization/mutex.h"
#include "absl/types/span.h"
#include "backend/access/read.h"
#include "backend/query/change_stream/queryable_change_stream_tvf.h"
#include "backend/query/function_catalog.h"
#include "backend/query/information_schema_catalog.h"
#include "backend/query/pg_catalog.h"
#include "backend/query/queryable_model.h"
#include "backend/query/queryable_named_schema.h"
#include "backend/query/queryable_property_graph.h"
#include "backend/query/queryable_sequence.h"
#include "backend/query/queryable_table.h"
#include "backend/query/queryable_udf.h"
#include "backend/query/queryable_view.h"
#include "backend/query/spanner_sys_catalog.h"
#include "backend/schema/catalog/schema.h"
#include "backend/schema/catalog/sequence.h"
#include "common/errors.h"
#include "third_party/spanner_pg/datatypes/extended/conversion_finder.h"
#include "third_party/spanner_pg/datatypes/extended/pg_jsonb_type.h"
#include "third_party/spanner_pg/datatypes/extended/pg_numeric_type.h"
#include "third_party/spanner_pg/datatypes/extended/pg_oid_type.h"
#include "google/protobuf/descriptor.h"
#include "zetasql/base/status_macros.h"
namespace google {
namespace spanner {
namespace emulator {
namespace backend {
using postgres_translator::spangres::datatypes::GetPgJsonbType;
using postgres_translator::spangres::datatypes::GetPgNumericType;
using postgres_translator::spangres::datatypes::GetPgOidType;
// A sub-catalog used for resolving NET function lookups.
class NetCatalog : public zetasql::Catalog {
public:
explicit NetCatalog(backend::Catalog* root_catalog)
: root_catalog_(root_catalog) {}
static constexpr char kName[] = "NET";
std::string FullName() const final {
std::string name = root_catalog_->FullName();
if (name.empty()) {
return kName;
}
absl::StrAppend(&name, kName);
return name;
}
// Implementation of the zetasql::Catalog interface.
absl::Status GetFunction(const std::string& name,
const zetasql::Function** function,
const FindOptions& options) final {
// The list of all functions is maintained in the root catalog in the
// form of their fully-qualified names. Just prefix the function name
// with the name of 'this' catalog and delegate the request to parent.
return root_catalog_->GetFunction(absl::StrJoin({FullName(), name}, "."),
function, options);
}
private:
backend::Catalog* root_catalog_;
};
// A sub-catalog used for resolving PG function lookups from GSQL queries.
// Required for supporting check constraints as PG queries are translated to
// GSQL queries before storing in the DDL statement.
class PGFunctionCatalog : public zetasql::Catalog {
public:
explicit PGFunctionCatalog(backend::Catalog* root_catalog)
: root_catalog_(root_catalog) {}
static constexpr char kName[] = "PG";
static constexpr char kPgNumericTypeName[] = "PG.NUMERIC";
static constexpr char kPgJsonbTypeName[] = "PG.JSONB";
static constexpr char kPgOidTypeName[] = "PG.OID";
std::string FullName() const final {
std::string name = root_catalog_->FullName();
if (name.empty()) {
return kName;
}
absl::StrAppend(&name, kName);
return name;
}
// Implementation of the zetasql::Catalog interface.
absl::Status GetFunction(const std::string& name,
const zetasql::Function** function,
const FindOptions& options) final {
// The list of all functions is maintained in the root catalog in the
// form of their fully-qualified names. Just prefix the function name
// with the name of 'this' catalog and delegate the request to parent.
return root_catalog_->GetFunction(absl::StrJoin({FullName(), name}, "."),
function, options);
}
// Similar to GetFunction. Types are maintained in the root catalog in the
// form of their fully-qualified names, such as PG.NUMERIC and PG.JSONB.
// Redirect the request to the parent.
absl::Status GetType(const std::string& name, const zetasql::Type** type,
const FindOptions& options) final {
std::string full_name = absl::StrJoin({FullName(), name}, ".");
if (root_catalog_->schema_ != nullptr &&
root_catalog_->schema_->dialect() ==
database_api::DatabaseDialect::POSTGRESQL) {
if (absl::EqualsIgnoreCase(full_name, kPgNumericTypeName)) {
*type = GetPgNumericType();
return absl::OkStatus();
} else if (absl::EqualsIgnoreCase(full_name, kPgJsonbTypeName)) {
*type = GetPgJsonbType();
return absl::OkStatus();
} else if (absl::EqualsIgnoreCase(full_name, kPgOidTypeName)) {
*type = GetPgOidType();
return absl::OkStatus();
}
}
return root_catalog_->GetType(full_name, type, options);
}
private:
backend::Catalog* root_catalog_;
};
Catalog::Catalog(const Schema* schema, const FunctionCatalog* function_catalog,
zetasql::TypeFactory* type_factory,
const zetasql::AnalyzerOptions& options, RowReader* reader,
QueryEvaluator* query_evaluator,
std::optional<std::string> change_stream_internal_lookup)
: schema_(schema),
function_catalog_(function_catalog),
type_factory_(type_factory) {
for (const auto* named_schema : schema->named_schemas()) {
named_schemas_[named_schema->Name()] =
std::make_unique<QueryableNamedSchema>(named_schema);
}
for (const auto* udf : schema->udfs()) {
std::string udf_name = udf->Name();
if (SDLObjectName::IsFullyQualifiedName(udf_name)) {
const auto [schema_part, name_part] =
SDLObjectName::SplitSchemaName(udf_name);
absl::Status status = AddObjectToNamedSchema(
std::string(schema_part),
std::make_unique<QueryableUdf>(udf, schema->default_time_zone(), this,
type_factory));
LOG_IF(ERROR, !status.ok()) << status.message();
} else {
udfs_[udf->Name()] = std::make_unique<QueryableUdf>(
udf, schema->default_time_zone(), this, type_factory);
}
}
// Pass the sequences to the catalog. This step has to be before the below
// schema objects, because sequences may be used by table columns and views.
for (const backend::Sequence* sequence : schema->sequences()) {
std::string name = sequence->Name();
if (SDLObjectName::IsFullyQualifiedName(name)) {
absl::Status status = AddObjectToNamedSchema(
std::string(SDLObjectName::GetSchemaName(name)),
std::make_unique<QueryableSequence>(sequence));
LOG_IF(ERROR, !status.ok()) << status.message();
} else {
sequences_[sequence->Name()] =
std::make_unique<QueryableSequence>(sequence);
}
}
// Pass the reader to tables.
for (const auto* table : schema->tables()) {
std::string name = table->Name();
if (SDLObjectName::IsFullyQualifiedName(name)) {
absl::Status status = AddObjectToNamedSchema(
std::string(SDLObjectName::GetSchemaName(name)),
std::make_unique<QueryableTable>(table, reader, options, this,
type_factory));
LOG_IF(ERROR, !status.ok()) << status.message();
} else {
tables_[table->Name()] = std::make_unique<QueryableTable>(
table, reader, options, this, type_factory);
}
std::string synonym_name = table->synonym();
if (!synonym_name.empty()) {
if (SDLObjectName::IsFullyQualifiedName(synonym_name)) {
absl::Status status = AddObjectToNamedSchema(
std::string(SDLObjectName::GetSchemaName(synonym_name)),
std::make_unique<QueryableTable>(table, reader, options, this,
type_factory,
/*is_synonym=*/true));
LOG_IF(ERROR, !status.ok()) << status.message();
} else {
tables_[synonym_name] = std::make_unique<QueryableTable>(
table, reader, options, this, type_factory, /*is_synonym=*/true);
}
}
}
for (const auto* model : schema->models()) {
std::string name = model->Name();
if (SDLObjectName::IsFullyQualifiedName(name)) {
absl::Status status = AddObjectToNamedSchema(
std::string(SDLObjectName::GetSchemaName(name)),
std::make_unique<QueryableModel>(model));
LOG_IF(ERROR, !status.ok()) << status.message();
} else {
models_[model->Name()] = std::make_unique<QueryableModel>(model);
}
}
for (const auto* graph : schema->property_graphs()) {
std::string name = graph->Name();
// Confirm that the property graph is not in a named schema as that is not
// supported yet.
if (SDLObjectName::IsFullyQualifiedName(name)) {
ABSL_LOG(FATAL) << "PropertyGraph not supported in named schemas. " << name;
} else {
for (const auto& node_table : graph->NodeTables()) {
std::string node_table_name = node_table.name();
if (SDLObjectName::IsFullyQualifiedName(node_table_name)) {
ABSL_LOG(FATAL) << "PropertyGraph not supported in named schemas. "
<< "Element table is within a named schema: "
<< node_table_name;
}
}
for (const auto& edge_table : graph->EdgeTables()) {
std::string edge_table_name = edge_table.name();
if (SDLObjectName::IsFullyQualifiedName(edge_table_name)) {
ABSL_LOG(FATAL) << "PropertyGraph not supported in named schemas. "
<< "Element table is within a named schema: "
<< edge_table_name;
}
}
property_graphs_[graph->Name()] =
std::make_unique<QueryablePropertyGraph>(this, type_factory, graph);
}
}
// Pass the query_evaluator to views.
for (const auto* view : schema->views()) {
std::string name = view->Name();
if (SDLObjectName::IsFullyQualifiedName(name)) {
absl::Status status = AddObjectToNamedSchema(
std::string(SDLObjectName::GetSchemaName(name)),
std::make_unique<QueryableView>(view, query_evaluator));
LOG_IF(ERROR, !status.ok()) << status.message();
} else {
views_[view->Name()] =
std::make_unique<QueryableView>(view, query_evaluator);
}
}
if (change_stream_internal_lookup.has_value()) {
auto change_stream =
schema->FindChangeStream(change_stream_internal_lookup.value());
auto partition_table = change_stream->change_stream_partition_table();
auto data_table = change_stream->change_stream_data_table();
tables_[partition_table->Name()] = std::make_unique<QueryableTable>(
partition_table, reader, options, this, type_factory);
tables_[data_table->Name()] = std::make_unique<QueryableTable>(
data_table, reader, options, this, type_factory);
}
// Register a table valued function for each active change stream
for (const auto* change_stream : schema->change_streams()) {
tvfs_[change_stream->tvf_name()] =
std::move(*QueryableChangeStreamTvf::Create(
change_stream->tvf_name(), options, this, type_factory,
schema->dialect() == database_api::DatabaseDialect::POSTGRESQL));
}
// Read types.
for (const auto& tablepair : tables_) {
const QueryableTable* table = tablepair.second.get();
for (int i = 0; i < table->NumColumns(); ++i) {
std::string type_name = table->GetColumn(i)->GetType()->TypeName(
zetasql::PRODUCT_EXTERNAL, /*use_external_float32=*/true);
types_[type_name] = table->GetColumn(i)->GetType();
}
}
if (absl::Status status = PopulateSystemProcedureMap(); !status.ok()) {
// Not an error that requires us to exit the emulator.
ABSL_LOG(ERROR) << "Failed to populate system procedure map: ";
}
}
absl::Status Catalog::PopulateSystemProcedureMap() {
// Context id is required if we have to hold and pass on some context for
// the implementation to map the signature back to an evaluator. Not
// required here.
auto [_, inserted] = procedures_.emplace(
"cancel_query",
std::make_unique<zetasql::Procedure>(
std::vector<std::string>{"cancel_query"},
zetasql::FunctionSignature(
zetasql::types::BoolType(),
{zetasql::FunctionArgumentType(zetasql::types::StringType())},
/*context_id=*/-1)));
if (!inserted) {
return absl::InternalError("Unable to populate system procedure map.");
}
return absl::OkStatus();
}
absl::Status Catalog::GetCatalog(const std::string& name,
zetasql::Catalog** catalog,
const FindOptions& options) {
if (absl::EqualsIgnoreCase(name, InformationSchemaCatalog::kName)) {
*catalog = GetInformationSchemaCatalog();
} else if (absl::EqualsIgnoreCase(name, InformationSchemaCatalog::kPGName)) {
*catalog = GetPGInformationSchemaCatalog();
} else if (absl::EqualsIgnoreCase(name, SpannerSysCatalog::kName)) {
*catalog = GetSpannerSysCatalog();
} else if (absl::EqualsIgnoreCase(name, NetCatalog::kName)) {
*catalog = GetNetFunctionsCatalog();
} else if (absl::EqualsIgnoreCase(name, PGFunctionCatalog::kName)) {
*catalog = GetPGFunctionsCatalog();
} else if (absl::EqualsIgnoreCase(name,
postgres_translator::PGCatalog::kName)) {
*catalog = GetPGCatalog();
} else {
if (QueryableNamedSchema* named_schema = GetNamedSchema(name);
named_schema) {
*catalog = named_schema;
}
}
return absl::OkStatus();
}
absl::Status Catalog::GetTable(const std::string& name,
const zetasql::Table** table,
const FindOptions& options) {
*table = nullptr;
if (auto it = views_.find(name); it != views_.end()) {
*table = it->second.get();
return absl::OkStatus();
}
if (auto it = tables_.find(name); it != tables_.end()) {
*table = it->second.get();
return absl::OkStatus();
}
return error::TableNotFound(name);
}
absl::Status Catalog::GetModel(const std::string& name,
const zetasql::Model** model,
const FindOptions& options) {
*model = nullptr;
if (auto it = models_.find(name); it != models_.end()) {
*model = it->second.get();
return absl::OkStatus();
}
return absl::OkStatus();
}
absl::Status Catalog::FindConversion(
const zetasql::Type* from_type, const zetasql::Type* to_type,
const zetasql::Catalog::FindConversionOptions& options,
zetasql::Conversion* conversion) {
ZETASQL_ASSIGN_OR_RETURN(
*conversion,
postgres_translator::spangres::datatypes::FindExtendedTypeConversion(
from_type, to_type, options));
return absl::OkStatus();
}
absl::Status Catalog::GetPropertyGraph(std::string_view name,
const zetasql::PropertyGraph*& graph,
const FindOptions& options) {
if (auto it = property_graphs_.find(std::string(name));
it != property_graphs_.end()) {
graph = it->second.get();
return absl::OkStatus();
}
return error::PropertyGraphNotFound(name);
}
absl::Status Catalog::FindTableValuedFunction(
const absl::Span<const std::string>& path,
const zetasql::TableValuedFunction** function,
const FindOptions& options) {
*function = nullptr;
std::string name = absl::StrJoin(path, ".");
if (auto it = tvfs_.find(name); it != tvfs_.end()) {
*function = it->second.get();
return absl::OkStatus();
}
function_catalog_->GetTableValuedFunction(name, function);
if (*function == nullptr) {
return TableValuedFunctionNotFoundError(path);
}
return absl::OkStatus();
}
absl::Status Catalog::GetFunction(const std::string& name,
const zetasql::Function** function,
const FindOptions& options) {
auto it = udfs_.find(name);
if (it != udfs_.end()) {
*function = it->second.get();
return absl::OkStatus();
}
function_catalog_->GetFunction(name, function);
return absl::OkStatus();
}
absl::Status Catalog::GetProcedure(const std::string& full_name,
const zetasql::Procedure** procedure,
const FindOptions& options) {
auto it = procedures_.find(absl::AsciiStrToLower(full_name));
if (it == procedures_.end()) {
return error::UnsupportedProcedure(full_name);
}
*procedure = it->second.get();
return absl::OkStatus();
}
absl::Status Catalog::GetSequence(const std::string& name,
const zetasql::Sequence** sequence,
const FindOptions& options) {
*sequence = nullptr;
if (auto it = sequences_.find(name); it != sequences_.end()) {
*sequence = it->second.get();
return absl::OkStatus();
}
return error::SequenceNotFound(name);
}
absl::Status Catalog::GetCatalogs(
absl::flat_hash_set<const zetasql::Catalog*>* output) const {
output->insert(GetInformationSchemaCatalog());
output->insert(GetSpannerSysCatalog());
output->insert(GetNetFunctionsCatalog());
ZETASQL_RETURN_IF_ERROR(GetNamedSchemas(output));
return absl::OkStatus();
}
absl::Status Catalog::GetTables(
absl::flat_hash_set<const zetasql::Table*>* output) const {
for (auto iter = tables_.begin(); iter != tables_.end(); ++iter) {
output->insert(iter->second.get());
}
for (auto iter = views_.begin(); iter != views_.end(); ++iter) {
output->insert(iter->second.get());
}
return absl::OkStatus();
}
absl::Status Catalog::GetTypes(
absl::flat_hash_set<const zetasql::Type*>* output) const {
for (const auto& [unused_name, type] : types_) {
output->insert(type);
}
return absl::OkStatus();
}
absl::Status Catalog::GetType(const std::string& name,
const zetasql::Type** type,
const FindOptions& options) {
*type = nullptr;
if (auto it = types_.find(name); it != types_.end()) {
*type = it->second;
return absl::OkStatus();
}
absl::StatusOr<const google::protobuf::Descriptor*> descriptor =
schema_->proto_bundle()->GetTypeDescriptor(name);
if (descriptor.ok()) {
ZETASQL_RETURN_IF_ERROR(type_factory_->MakeProtoType(descriptor.value(), type));
return absl::OkStatus();
} else {
auto enum_descriptor = schema_->proto_bundle()->GetEnumTypeDescriptor(name);
if (enum_descriptor.ok()) {
ZETASQL_RETURN_IF_ERROR(
type_factory_->MakeEnumType(enum_descriptor.value(), type));
return absl::OkStatus();
}
return descriptor.status();
}
return error::TypeNotFound(name);
}
absl::Status Catalog::GetFunctions(
absl::flat_hash_set<const zetasql::Function*>* output) const {
for (const auto& [unused_name, function] : udfs_) {
output->insert(function.get());
}
function_catalog_->GetFunctions(output);
return absl::OkStatus();
}
absl::Status Catalog::GetTableValuedFunctions(
absl::flat_hash_set<const zetasql::TableValuedFunction*>* output) const {
for (const auto& [unused_name, function] : tvfs_) {
output->insert(function.get());
}
return absl::OkStatus();
}
zetasql::Catalog* Catalog::GetInformationSchemaCatalog() const {
absl::MutexLock lock(&mu_);
auto spanner_sys_catalog = GetSpannerSysCatalogWithoutLocks();
if (!information_schema_catalog_) {
information_schema_catalog_ = std::make_unique<InformationSchemaCatalog>(
InformationSchemaCatalog::kName, schema_, spanner_sys_catalog);
}
return information_schema_catalog_.get();
}
SpannerSysCatalog* Catalog::GetSpannerSysCatalog() const {
absl::MutexLock lock(&mu_);
return GetSpannerSysCatalogWithoutLocks();
}
SpannerSysCatalog* Catalog::GetSpannerSysCatalogWithoutLocks() const {
if (!spanner_sys_catalog_) {
spanner_sys_catalog_ = std::make_unique<SpannerSysCatalog>();
}
return spanner_sys_catalog_.get();
}
zetasql::Catalog* Catalog::GetPGInformationSchemaCatalog() const {
absl::MutexLock lock(&mu_);
auto spanner_sys_catalog = GetSpannerSysCatalogWithoutLocks();
if (!pg_information_schema_catalog_) {
pg_information_schema_catalog_ = std::make_unique<InformationSchemaCatalog>(
InformationSchemaCatalog::kPGName, schema_, spanner_sys_catalog);
}
return pg_information_schema_catalog_.get();
}
zetasql::Catalog* Catalog::GetNetFunctionsCatalog() const {
absl::MutexLock lock(&mu_);
if (!net_catalog_) {
net_catalog_ = std::make_unique<NetCatalog>(const_cast<Catalog*>(this));
}
return net_catalog_.get();
}
zetasql::Catalog* Catalog::GetPGFunctionsCatalog() const {
absl::MutexLock lock(&mu_);
if (!pg_function_catalog_) {
pg_function_catalog_ =
std::make_unique<PGFunctionCatalog>(const_cast<Catalog*>(this));
}
return pg_function_catalog_.get();
}
zetasql::Catalog* Catalog::GetPGCatalog() const {
absl::MutexLock lock(&mu_);
if (schema_->dialect() == database_api::DatabaseDialect::POSTGRESQL) {
if (!pg_catalog_) {
pg_catalog_ =
std::make_unique<postgres_translator::PGCatalog>(this, schema_);
}
return pg_catalog_.get();
}
return nullptr;
}
QueryableNamedSchema* Catalog::GetNamedSchema(const std::string& name) {
if (auto it = named_schemas_.find(name); it != named_schemas_.end()) {
return it->second.get();
}
return nullptr;
}
absl::Status Catalog::GetNamedSchemas(
absl::flat_hash_set<const zetasql::Catalog*>* output) const {
for (auto iter = named_schemas_.begin(); iter != named_schemas_.end();
++iter) {
output->insert(iter->second.get());
}
return absl::OkStatus();
}
template <typename T>
absl::Status Catalog::AddObjectToNamedSchema(
const std::string& named_schema_name, T object) {
QueryableNamedSchema* named_schema = GetNamedSchema(named_schema_name);
if (named_schema == nullptr) {
return error::NamedSchemaNotFound(named_schema_name);
}
named_schema->AddObject(std::move(object));
return absl::OkStatus();
}
} // namespace backend
} // namespace emulator
} // namespace spanner
} // namespace google