c/driver/framework/statement.h (313 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #pragma once #include <cstdint> #include <memory> #include <optional> #include <string> #include <utility> #include <variant> #include <vector> #include "driver/framework/base_driver.h" #include "driver/framework/status.h" #include "driver/framework/utility.h" namespace adbc::driver { /// \brief A base implementation of a statement. template <typename Derived> class Statement : public BaseStatement<Derived> { public: using Base = Statement<Derived>; /// \brief What to do in ingestion when the table does not exist. enum class TableDoesNotExist { kCreate, kFail, }; /// \brief What to do in ingestion when the table already exists. enum class TableExists { kAppend, kFail, kReplace, }; /// \brief Statement state: initialized with no set query. struct EmptyState {}; /// \brief Statement state: bulk ingestion. struct IngestState { std::optional<std::string> target_catalog; std::optional<std::string> target_schema; std::optional<std::string> target_table; bool temporary = false; TableDoesNotExist table_does_not_exist_ = TableDoesNotExist::kCreate; TableExists table_exists_ = TableExists::kFail; }; /// \brief Statement state: prepared statement. struct PreparedState { std::string query; }; /// \brief Statement state: ad-hoc query. struct QueryState { std::string query; }; /// \brief Statement state: one of the above. using State = std::variant<EmptyState, IngestState, PreparedState, QueryState>; Statement() : BaseStatement<Derived>() { std::memset(&bind_parameters_, 0, sizeof(bind_parameters_)); } ~Statement() = default; AdbcStatusCode Bind(ArrowArray* values, ArrowSchema* schema, AdbcError* error) { if (!values || !values->release) { return status::InvalidArgument(Derived::kErrorPrefix, " Bind: must provide non-NULL array") .ToAdbc(error); } else if (!schema || !schema->release) { return status::InvalidArgument(Derived::kErrorPrefix, " Bind: must provide non-NULL stream") .ToAdbc(error); } if (bind_parameters_.release) bind_parameters_.release(&bind_parameters_); MakeArrayStream(schema, values, &bind_parameters_); return ADBC_STATUS_OK; } AdbcStatusCode BindStream(ArrowArrayStream* stream, AdbcError* error) { if (!stream || !stream->release) { return status::InvalidArgument(Derived::kErrorPrefix, " BindStream: must provide non-NULL stream") .ToAdbc(error); } if (bind_parameters_.release) bind_parameters_.release(&bind_parameters_); // Move stream bind_parameters_ = *stream; std::memset(stream, 0, sizeof(*stream)); return ADBC_STATUS_OK; } AdbcStatusCode Cancel(AdbcError* error) { return ADBC_STATUS_NOT_IMPLEMENTED; } AdbcStatusCode ExecutePartitions(struct ArrowSchema* schema, struct AdbcPartitions* partitions, int64_t* rows_affected, AdbcError* error) { return ADBC_STATUS_NOT_IMPLEMENTED; } AdbcStatusCode ExecuteQuery(ArrowArrayStream* stream, int64_t* rows_affected, AdbcError* error) { return std::visit( [&](auto&& state) -> AdbcStatusCode { using T = std::decay_t<decltype(state)>; if constexpr (std::is_same_v<T, EmptyState>) { return status::InvalidState(Derived::kErrorPrefix, " Cannot ExecuteQuery without setting the query") .ToAdbc(error); } else if constexpr (std::is_same_v<T, IngestState>) { if (stream) { return status::InvalidState(Derived::kErrorPrefix, " Cannot ingest with result set") .ToAdbc(error); } RAISE_RESULT(error, int64_t rows, impl().ExecuteIngestImpl(state)); if (rows_affected) { *rows_affected = rows; } return ADBC_STATUS_OK; } else if constexpr (std::is_same_v<T, PreparedState> || std::is_same_v<T, QueryState>) { int64_t rows = 0; if (stream) { RAISE_RESULT(error, rows, impl().ExecuteQueryImpl(state, stream)); } else { RAISE_RESULT(error, rows, impl().ExecuteUpdateImpl(state)); } if (rows_affected) { *rows_affected = rows; } return ADBC_STATUS_OK; } else { static_assert(!sizeof(T), "case not implemented"); } }, state_); } AdbcStatusCode ExecuteSchema(ArrowSchema* schema, AdbcError* error) { return ADBC_STATUS_NOT_IMPLEMENTED; } AdbcStatusCode GetParameterSchema(struct ArrowSchema* schema, struct AdbcError* error) { return std::visit( [&](auto&& state) -> AdbcStatusCode { using T = std::decay_t<decltype(state)>; if constexpr (std::is_same_v<T, EmptyState>) { return status::InvalidState( Derived::kErrorPrefix, " Cannot GetParameterSchema without setting the query") .ToAdbc(error); } else if constexpr (std::is_same_v<T, IngestState>) { return status::InvalidState(Derived::kErrorPrefix, " Cannot GetParameterSchema in bulk ingestion") .ToAdbc(error); } else if constexpr (std::is_same_v<T, PreparedState>) { return impl().GetParameterSchemaImpl(state, schema).ToAdbc(error); } else if constexpr (std::is_same_v<T, QueryState>) { return status::InvalidState( Derived::kErrorPrefix, " Cannot GetParameterSchema without calling Prepare") .ToAdbc(error); } else { static_assert(!sizeof(T), "case not implemented"); } }, state_); } AdbcStatusCode Init(void* parent, AdbcError* error) { this->lifecycle_state_ = LifecycleState::kInitialized; if (auto status = impl().InitImpl(parent); !status.ok()) { return status.ToAdbc(error); } return ObjectBase::Init(parent, error); } AdbcStatusCode Prepare(AdbcError* error) { RAISE_STATUS(error, std::visit( [&](auto&& state) -> Status { using T = std::decay_t<decltype(state)>; if constexpr (std::is_same_v<T, EmptyState>) { return status::InvalidState( Derived::kErrorPrefix, " Cannot Prepare without setting the query"); } else if constexpr (std::is_same_v<T, IngestState>) { return status::InvalidState( Derived::kErrorPrefix, " Cannot Prepare without setting the query"); } else if constexpr (std::is_same_v<T, PreparedState>) { // No-op return status::Ok(); } else if constexpr (std::is_same_v<T, QueryState>) { UNWRAP_STATUS(impl().PrepareImpl(state)); state_ = PreparedState{std::move(state.query)}; return status::Ok(); } else { static_assert(!sizeof(T), "case not implemented"); } }, state_)); return ADBC_STATUS_OK; } AdbcStatusCode Release(AdbcError* error) { if (bind_parameters_.release) { bind_parameters_.release(&bind_parameters_); bind_parameters_.release = nullptr; } return impl().ReleaseImpl().ToAdbc(error); } AdbcStatusCode SetOption(std::string_view key, Option value, AdbcError* error) { auto ensure_ingest = [&]() -> IngestState& { if (!std::holds_alternative<IngestState>(state_)) { state_ = IngestState{}; } return std::get<IngestState>(state_); }; if (key == ADBC_INGEST_OPTION_MODE) { RAISE_RESULT(error, auto mode, value.AsString()); if (mode == ADBC_INGEST_OPTION_MODE_APPEND) { auto& state = ensure_ingest(); state.table_does_not_exist_ = TableDoesNotExist::kFail; state.table_exists_ = TableExists::kAppend; } else if (mode == ADBC_INGEST_OPTION_MODE_CREATE) { auto& state = ensure_ingest(); state.table_does_not_exist_ = TableDoesNotExist::kCreate; state.table_exists_ = TableExists::kFail; } else if (mode == ADBC_INGEST_OPTION_MODE_CREATE_APPEND) { auto& state = ensure_ingest(); state.table_does_not_exist_ = TableDoesNotExist::kCreate; state.table_exists_ = TableExists::kAppend; } else if (mode == ADBC_INGEST_OPTION_MODE_REPLACE) { auto& state = ensure_ingest(); state.table_does_not_exist_ = TableDoesNotExist::kCreate; state.table_exists_ = TableExists::kReplace; } else { return status::InvalidArgument(Derived::kErrorPrefix, " Invalid ingest mode '", key, "': ", value.Format()) .ToAdbc(error); } return ADBC_STATUS_OK; } else if (key == ADBC_INGEST_OPTION_TARGET_CATALOG) { if (value.has_value()) { RAISE_RESULT(error, auto catalog, value.AsString()); ensure_ingest().target_catalog = catalog; } else { ensure_ingest().target_catalog = std::nullopt; } return ADBC_STATUS_OK; } else if (key == ADBC_INGEST_OPTION_TARGET_DB_SCHEMA) { if (value.has_value()) { RAISE_RESULT(error, auto schema, value.AsString()); ensure_ingest().target_schema = schema; } else { ensure_ingest().target_schema = std::nullopt; } return ADBC_STATUS_OK; } else if (key == ADBC_INGEST_OPTION_TARGET_TABLE) { RAISE_RESULT(error, auto table, value.AsString()); ensure_ingest().target_table = table; return ADBC_STATUS_OK; } else if (key == ADBC_INGEST_OPTION_TEMPORARY) { RAISE_RESULT(error, auto temporary, value.AsBool()); ensure_ingest().temporary = temporary; return ADBC_STATUS_OK; } return impl().SetOptionImpl(key, value).ToAdbc(error); } AdbcStatusCode SetSqlQuery(const char* query, AdbcError* error) { RAISE_STATUS(error, std::visit( [&](auto&& state) -> Status { using T = std::decay_t<decltype(state)>; if constexpr (std::is_same_v<T, EmptyState>) { state_ = QueryState{ std::string(query), }; return status::Ok(); } else if constexpr (std::is_same_v<T, IngestState>) { state_ = QueryState{ std::string(query), }; return status::Ok(); } else if constexpr (std::is_same_v<T, PreparedState>) { state_ = QueryState{ std::string(query), }; return status::Ok(); } else if constexpr (std::is_same_v<T, QueryState>) { state.query = std::string(query); return status::Ok(); } else { static_assert(!sizeof(T), "info value type not implemented"); } }, state_)); return ADBC_STATUS_OK; } AdbcStatusCode SetSubstraitPlan(const uint8_t* plan, size_t length, AdbcError* error) { return ADBC_STATUS_NOT_IMPLEMENTED; } Result<int64_t> ExecuteIngestImpl(IngestState& state) { return status::NotImplemented(Derived::kErrorPrefix, " Bulk ingest is not implemented"); } Result<int64_t> ExecuteQueryImpl(PreparedState& state, ArrowArrayStream* stream) { return status::NotImplemented(Derived::kErrorPrefix, " ExecuteQuery is not implemented"); } Result<int64_t> ExecuteQueryImpl(QueryState& state, ArrowArrayStream* stream) { return status::NotImplemented(Derived::kErrorPrefix, " ExecuteQuery is not implemented"); } Result<int64_t> ExecuteUpdateImpl(PreparedState& state) { return status::NotImplemented(Derived::kErrorPrefix, " ExecuteQuery (update) is not implemented"); } Result<int64_t> ExecuteUpdateImpl(QueryState& state) { return status::NotImplemented(Derived::kErrorPrefix, " ExecuteQuery (update) is not implemented"); } Status GetParameterSchemaImpl(PreparedState& state, ArrowSchema* schema) { return status::NotImplemented(Derived::kErrorPrefix, " GetParameterSchema is not implemented"); } Status InitImpl(void* parent) { return status::Ok(); } Status PrepareImpl(QueryState& state) { return status::NotImplemented(Derived::kErrorPrefix, " Prepare is not implemented"); } Status ReleaseImpl() { return status::Ok(); } Status SetOptionImpl(std::string_view key, Option value) { return status::NotImplemented(Derived::kErrorPrefix, " Unknown statement option ", key, "=", value.Format()); } protected: ArrowArrayStream bind_parameters_; private: State state_ = State(EmptyState{}); Derived& impl() { return static_cast<Derived&>(*this); } }; } // namespace adbc::driver