c/driver/framework/connection.h (230 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstdint>
#include <memory>
#include <string>
#include <string_view>
#include <utility>
#include <vector>
#include <arrow-adbc/adbc.h>
#include "driver/framework/base_driver.h"
#include "driver/framework/objects.h"
#include "driver/framework/utility.h"
namespace adbc::driver {
/// \brief The CRTP base implementation of an AdbcConnection.
///
/// Derived should override and implement the Impl methods, but not others.
/// Overridden methods should defer to the superclass version at the end.
/// (The Base typedef is provided to make this easier.) Derived should also
/// define a constexpr static symbol called kErrorPrefix that is used to
/// construct error messages.
template <typename Derived>
class Connection : public ObjectBase {
public:
using Base = Connection<Derived>;
/// \brief Whether autocommit is enabled or not (by default: enabled).
enum class AutocommitState {
kAutocommit,
kTransaction,
};
Connection() : ObjectBase() {}
~Connection() = default;
/// \internal
AdbcStatusCode Init(void* parent, AdbcError* error) override {
if (auto status = impl().InitImpl(parent); !status.ok()) {
return status.ToAdbc(error);
}
return ObjectBase::Init(parent, error);
}
/// \internal
AdbcStatusCode Cancel(AdbcError* error) { return ADBC_STATUS_NOT_IMPLEMENTED; }
/// \internal
AdbcStatusCode Commit(AdbcError* error) {
switch (autocommit_) {
case AutocommitState::kAutocommit:
return status::InvalidState(Derived::kErrorPrefix,
" No active transaction, cannot commit")
.ToAdbc(error);
case AutocommitState::kTransaction:
return impl().CommitImpl().ToAdbc(error);
}
assert(false);
return ADBC_STATUS_INTERNAL;
}
/// \internal
AdbcStatusCode GetInfo(const uint32_t* info_codes, size_t info_codes_length,
ArrowArrayStream* out, AdbcError* error) {
if (!out) {
RAISE_STATUS(error, status::InvalidArgument("out must be non-null"));
}
std::vector<uint32_t> codes(info_codes, info_codes + info_codes_length);
RAISE_RESULT(error, auto infos, impl().InfoImpl(codes));
RAISE_STATUS(error, MakeGetInfoStream(infos, out));
return ADBC_STATUS_OK;
}
/// \internal
AdbcStatusCode GetObjects(int c_depth, const char* catalog, const char* db_schema,
const char* table_name, const char** table_type,
const char* column_name, ArrowArrayStream* out,
AdbcError* error) {
const auto catalog_filter =
catalog ? std::make_optional(std::string_view(catalog)) : std::nullopt;
const auto schema_filter =
db_schema ? std::make_optional(std::string_view(db_schema)) : std::nullopt;
const auto table_filter =
table_name ? std::make_optional(std::string_view(table_name)) : std::nullopt;
const auto column_filter =
column_name ? std::make_optional(std::string_view(column_name)) : std::nullopt;
std::vector<std::string_view> table_type_filter;
while (table_type && *table_type) {
if (*table_type) {
table_type_filter.push_back(std::string_view(*table_type));
}
table_type++;
}
GetObjectsDepth depth = GetObjectsDepth::kColumns;
switch (c_depth) {
case ADBC_OBJECT_DEPTH_CATALOGS:
depth = GetObjectsDepth::kCatalogs;
break;
case ADBC_OBJECT_DEPTH_COLUMNS:
depth = GetObjectsDepth::kColumns;
break;
case ADBC_OBJECT_DEPTH_DB_SCHEMAS:
depth = GetObjectsDepth::kSchemas;
break;
case ADBC_OBJECT_DEPTH_TABLES:
depth = GetObjectsDepth::kTables;
break;
default:
return status::InvalidArgument(Derived::kErrorPrefix,
" GetObjects: invalid depth ", c_depth)
.ToAdbc(error);
}
RAISE_RESULT(error, auto helper, impl().GetObjectsImpl());
auto status = BuildGetObjects(helper.get(), depth, catalog_filter, schema_filter,
table_filter, column_filter, table_type_filter, out);
RAISE_STATUS(error, helper->Close());
RAISE_STATUS(error, status);
return ADBC_STATUS_OK;
}
/// \internal
Result<Option> GetOption(std::string_view key) override {
if (key == ADBC_CONNECTION_OPTION_AUTOCOMMIT) {
switch (autocommit_) {
case AutocommitState::kAutocommit:
return driver::Option(ADBC_OPTION_VALUE_ENABLED);
case AutocommitState::kTransaction:
return driver::Option(ADBC_OPTION_VALUE_DISABLED);
}
} else if (key == ADBC_CONNECTION_OPTION_CURRENT_CATALOG) {
UNWRAP_RESULT(auto catalog, impl().GetCurrentCatalogImpl());
if (catalog) {
return driver::Option(std::move(*catalog));
}
return driver::Option();
} else if (key == ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA) {
UNWRAP_RESULT(auto schema, impl().GetCurrentSchemaImpl());
if (schema) {
return driver::Option(std::move(*schema));
}
return driver::Option();
}
return Base::GetOption(key);
}
/// \internal
AdbcStatusCode GetStatistics(const char* catalog, const char* db_schema,
const char* table_name, char approximate,
ArrowArrayStream* out, AdbcError* error) {
return ADBC_STATUS_NOT_IMPLEMENTED;
}
/// \internal
AdbcStatusCode GetStatisticNames(ArrowArrayStream* out, AdbcError* error) {
return ADBC_STATUS_NOT_IMPLEMENTED;
}
/// \internal
AdbcStatusCode GetTableSchema(const char* catalog, const char* db_schema,
const char* table_name, ArrowSchema* schema,
AdbcError* error) {
if (!table_name) {
return status::InvalidArgument(Derived::kErrorPrefix,
" GetTableSchema: must provide table_name")
.ToAdbc(error);
}
std::memset(schema, 0, sizeof(*schema));
std::optional<std::string_view> catalog_param =
catalog ? std::make_optional(std::string_view(catalog)) : std::nullopt;
std::optional<std::string_view> db_schema_param =
db_schema ? std::make_optional(std::string_view(db_schema)) : std::nullopt;
std::string_view table_name_param = table_name;
return impl()
.GetTableSchemaImpl(catalog_param, db_schema_param, table_name_param, schema)
.ToAdbc(error);
}
/// \internal
AdbcStatusCode GetTableTypes(ArrowArrayStream* out, AdbcError* error) {
if (!out) {
RAISE_STATUS(error, status::InvalidArgument("out must be non-null"));
}
RAISE_RESULT(error, std::vector<std::string> table_types, impl().GetTableTypesImpl());
RAISE_STATUS(error, MakeTableTypesStream(table_types, out));
return ADBC_STATUS_OK;
}
/// \internal
AdbcStatusCode ReadPartition(const uint8_t* serialized_partition,
size_t serialized_length, ArrowArrayStream* out,
AdbcError* error) {
return ADBC_STATUS_NOT_IMPLEMENTED;
}
/// \internal
AdbcStatusCode Release(AdbcError* error) override {
return impl().ReleaseImpl().ToAdbc(error);
}
/// \internal
AdbcStatusCode Rollback(AdbcError* error) {
switch (autocommit_) {
case AutocommitState::kAutocommit:
return status::InvalidState(Derived::kErrorPrefix,
" No active transaction, cannot rollback")
.ToAdbc(error);
case AutocommitState::kTransaction:
return impl().RollbackImpl().ToAdbc(error);
}
assert(false);
return ADBC_STATUS_INTERNAL;
}
/// \internal
AdbcStatusCode SetOption(std::string_view key, Option value,
AdbcError* error) override {
return impl().SetOptionImpl(key, value).ToAdbc(error);
}
/// \brief Commit the current transaction and begin a new transaction.
///
/// Only called when autocommit is disabled.
Status CommitImpl() { return status::NotImplemented("Commit"); }
Result<std::optional<std::string>> GetCurrentCatalogImpl() { return std::nullopt; }
Result<std::optional<std::string>> GetCurrentSchemaImpl() { return std::nullopt; }
/// \brief Query the database catalog.
///
/// The default implementation assumes the underlying database allows
/// querying the catalog in a certain manner, embodied in the helper class
/// returned here. If the database can directly implement GetObjects, then
/// directly override GetObjects instead of using this helper.
Result<std::unique_ptr<GetObjectsHelper>> GetObjectsImpl() {
return std::make_unique<GetObjectsHelper>();
}
Status GetTableSchemaImpl(std::optional<std::string_view> catalog,
std::optional<std::string_view> db_schema,
std::string_view table_name, ArrowSchema* schema) {
return status::NotImplemented("GetTableSchema");
}
Result<std::vector<std::string>> GetTableTypesImpl() {
return std::vector<std::string>();
}
Result<std::vector<InfoValue>> InfoImpl(const std::vector<uint32_t>& codes) {
return std::vector<InfoValue>{};
}
Status InitImpl(void* parent) { return status::Ok(); }
Status ReleaseImpl() { return status::Ok(); }
Status RollbackImpl() { return status::NotImplemented("Rollback"); }
Status SetOptionImpl(std::string_view key, Option value) {
if (key == ADBC_CONNECTION_OPTION_AUTOCOMMIT) {
UNWRAP_RESULT(auto enabled, value.AsBool());
switch (autocommit_) {
case AutocommitState::kAutocommit: {
if (!enabled) {
UNWRAP_STATUS(impl().ToggleAutocommitImpl(false));
autocommit_ = AutocommitState::kTransaction;
}
break;
}
case AutocommitState::kTransaction: {
if (enabled) {
UNWRAP_STATUS(impl().ToggleAutocommitImpl(true));
autocommit_ = AutocommitState::kAutocommit;
}
break;
}
}
return status::Ok();
}
return status::NotImplemented(Derived::kErrorPrefix, " Unknown connection option ",
key, "=", value.Format());
}
Status ToggleAutocommitImpl(bool enable_autocommit) {
return status::NotImplemented(Derived::kErrorPrefix, " Cannot change autocommit");
}
protected:
AutocommitState autocommit_ = AutocommitState::kAutocommit;
private:
Derived& impl() { return static_cast<Derived&>(*this); }
};
} // namespace adbc::driver