extensions/sql/data/SociConnectors.cpp (137 lines of code) (raw):
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "SociConnectors.h"
#include "core/logging/LoggerFactory.h"
namespace org::apache::nifi::minifi::sql {
void SociRow::setIterator(const soci::rowset<soci::row>::iterator& iter) {
current_ = iter;
}
soci::rowset<soci::row>::iterator SociRow::getIterator() const {
return current_;
}
void SociRow::next() {
++current_;
}
std::size_t SociRow::size() const {
return current_->size();
}
std::string SociRow::getColumnName(std::size_t index) const {
return current_->get_properties(index).get_name();
}
bool SociRow::isNull(std::size_t index) const {
return current_->get_indicator(index) == soci::i_null;
}
DataType SociRow::getDataType(std::size_t index) const {
switch (const auto dataType = current_->get_properties(index).get_data_type()) {
case soci::data_type::dt_string: {
return DataType::STRING;
}
case soci::data_type::dt_double: {
return DataType::DOUBLE;
}
case soci::data_type::dt_integer: {
return DataType::INTEGER;
}
case soci::data_type::dt_long_long: {
return DataType::LONG_LONG;
}
case soci::data_type::dt_unsigned_long_long: {
return DataType::UNSIGNED_LONG_LONG;
}
case soci::data_type::dt_date: {
return DataType::DATE;
}
default: {
throw minifi::Exception(PROCESSOR_EXCEPTION, "SQLRowsetProcessor: Unsupported data type " + std::to_string(dataType));
}
}
}
std::string SociRow::getString(std::size_t index) const {
return current_->get<std::string>(index);
}
double SociRow::getDouble(std::size_t index) const {
return current_->get<double>(index);
}
int SociRow::getInteger(std::size_t index) const {
return current_->get<int>(index);
}
long long SociRow::getLongLong(std::size_t index) const { // NOLINT(google-runtime-int)
return current_->get<long long>(index); // NOLINT(google-runtime-int)
}
unsigned long long SociRow::getUnsignedLongLong(std::size_t index) const { // NOLINT(google-runtime-int)
return current_->get<unsigned long long>(index); // NOLINT(google-runtime-int)
}
std::tm SociRow::getDate(std::size_t index) const {
return current_->get<std::tm>(index);
}
void SociRowset::reset() {
current_row_.setIterator(rowset_.begin());
}
bool SociRowset::is_done() {
return current_row_.getIterator() == rowset_.end();
}
Row& SociRowset::getCurrent() {
return current_row_;
}
void SociRowset::next() {
current_row_.next();
}
SociStatement::SociStatement(soci::session &session, const std::string &query)
: Statement(query), session_(session), logger_(core::logging::LoggerFactory<SociStatement>::getLogger()) {}
std::unique_ptr<Rowset> SociStatement::execute(const std::vector<std::string>& args) {
try {
auto stmt = session_.prepare << query_;
for (auto& arg : args) {
// binds arguments to the prepared statement
stmt.operator,(soci::use(arg));
}
return std::make_unique<SociRowset>(stmt);
} catch (const soci::soci_error& ex) {
logger_->log_error("Error while evaluating query, type: {}, what: {}", typeid(ex).name(), ex.what());
if (ex.get_error_category() == soci::soci_error::error_category::connection_error
|| ex.get_error_category() == soci::soci_error::error_category::system_error) {
throw sql::ConnectionError(ex.get_error_message());
} else {
throw sql::StatementError(ex.get_error_message());
}
} catch (const std::exception& ex) {
logger_->log_error("Error while evaluating query, type: {}, what: {}", typeid(ex).name(), ex.what());
throw sql::StatementError(ex.what());
}
}
void SociSession::begin() {
session_.begin();
}
void SociSession::commit() {
session_.commit();
}
void SociSession::rollback() {
session_.rollback();
}
void SociSession::execute(const std::string &statement) {
session_ << statement;
}
ODBCConnection::ODBCConnection(std::string connectionString)
: connection_string_(std::move(connectionString)) {
session_ = std::make_unique<soci::session>(getSessionParameters());
}
bool ODBCConnection::connected(std::string& exception) const {
try {
exception.clear();
// According to https://stackoverflow.com/questions/3668506/efficient-sql-test-query-or-validation-query-that-will-work-across-all-or-most by Rob Hruska,
// 'select 1' works for: H2, MySQL, Microsoft SQL Server, PostgreSQL, SQLite. For Oracle 'SELECT 1 FROM DUAL' works.
prepareStatement("select 1")->execute();
return true;
} catch (const std::exception& e) {
exception = e.what();
return false;
}
}
std::unique_ptr<sql::Statement> ODBCConnection::prepareStatement(const std::string& query) const {
return std::make_unique<sql::SociStatement>(*session_, query);
}
std::unique_ptr<Session> ODBCConnection::getSession() const {
return std::make_unique<sql::SociSession>(*session_);
}
soci::connection_parameters ODBCConnection::getSessionParameters() const {
static const soci::backend_factory &backEnd = *soci::factory_odbc();
soci::connection_parameters parameters(backEnd, connection_string_);
parameters.set_option(soci::odbc_option_driver_complete, "0" /* SQL_DRIVER_NOPROMPT */);
return parameters;
}
} // namespace org::apache::nifi::minifi::sql