cdk/protocol/mysqlx/session.cc (335 lines of code) (raw):
/*
* Copyright (c) 2015, 2024, Oracle and/or its affiliates.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License, version 2.0, as
* published by the Free Software Foundation.
*
* This program is designed to work with certain software (including
* but not limited to OpenSSL) that is licensed under separate terms, as
* designated in a particular file or component or in included license
* documentation. The authors of MySQL hereby grant you an additional
* permission to link the program and your derivative works with the
* separately licensed software that they have either included with
* the program or referenced in the documentation.
*
* Without limiting anything contained in the foregoing, this file,
* which is part of Connector/C++, is also subject to the
* Universal FOSS Exception, version 1.0, a copy of which can be found at
* https://oss.oracle.com/licenses/universal-foss-exception.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU General Public License, version 2.0, for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
/*
Implementation of mysqlx protocol API: session handling
=======================================================
*/
#include "protocol.h"
#include "builders.h"
PUSH_SYS_WARNINGS_CDK
#include <iostream>
POP_SYS_WARNINGS_CDK
PUSH_PB_WARNINGS
#include "protobuf/mysqlx_session.pb.h"
#include "protobuf/mysqlx_crud.pb.h"
POP_PB_WARNINGS
using namespace cdk::foundation;
using namespace google::protobuf;
using namespace cdk::protocol::mysqlx;
/*
Implementation of Protocol methods using the internal implementation.
*/
namespace cdk {
namespace protocol {
namespace mysqlx {
// Client-side API
struct Cap_builder : api::Any::Document::Processor
{
Mysqlx::Connection::Capabilities *m_msg;
Any_builder m_ab;
Cap_builder() : m_msg(NULL)
{}
void reset(Mysqlx::Connection::CapabilitiesSet &msg)
{
m_msg = msg.mutable_capabilities();
}
Any_prc* key_val(const string &key)
{
Mysqlx::Connection::Capability *cap = m_msg->add_capabilities();
cap->set_name(key);
m_ab.reset(*cap->mutable_value());
return &m_ab;
}
};
void Protocol::start_Pipeline()
{
get_impl().start_Pipeline();
}
Protocol::Op& Protocol::snd_Pipeline()
{
return get_impl().snd_Pipeline();
}
void Protocol::clear_Pipeline()
{
get_impl().clear_Pipeline();
}
void Protocol::set_compression(Compression_type::value compression_type,
size_t threshold)
{
get_impl().set_compression(compression_type, threshold);
}
Protocol::Op& Protocol::snd_CapabilitiesSet(const api::Any::Document& caps)
{
Mysqlx::Connection::CapabilitiesSet msg;
Cap_builder builder;
builder.reset(msg);
caps.process(builder);
return get_impl().snd_start(msg, msg_type::cli_CapabilitiesSet);
}
Protocol::Op& Protocol::snd_AuthenticateStart(const char* mechanism,
bytes data,
bytes response)
{
Mysqlx::Session::AuthenticateStart auth_start;
auth_start.set_mech_name(mechanism);
auth_start.set_auth_data((const void*)data.begin(),
data.size());
auth_start.set_initial_response((const void*)response.begin(),
response.size());
return get_impl().snd_start(auth_start, msg_type::cli_AuthenticateStart);
}
Protocol::Op& Protocol::snd_AuthenticateContinue(bytes data)
{
Mysqlx::Session::AuthenticateContinue auth_cont;
auth_cont.set_auth_data((const void*)data.begin(), data.size());
return get_impl().snd_start(auth_cont, msg_type::cli_AuthenticateContinue);
}
struct Expectation_builder : api::Expectations::Processor, api::Expectation_processor
{
Mysqlx::Expect::Open *m_msg;
Expectation_builder(Mysqlx::Expect::Open *msg) : m_msg(msg)
{}
void set(uint32_t key)
{
Mysqlx::Expect::Open_Condition *cond = m_msg->add_cond();
cond->set_op(Mysqlx::Expect::Open_Condition_ConditionOperation_EXPECT_OP_SET);
cond->set_condition_key(key);
}
void set(uint32_t key, bytes data)
{
Mysqlx::Expect::Open_Condition *cond = m_msg->add_cond();
cond->set_op(Mysqlx::Expect::Open_Condition_ConditionOperation_EXPECT_OP_SET);
cond->set_condition_key(key);
cond->set_condition_value(data.begin(), data.size());
}
void unset(uint32_t key)
{
Mysqlx::Expect::Open_Condition *cond = m_msg->add_cond();
cond->set_op(Mysqlx::Expect::Open_Condition_ConditionOperation_EXPECT_OP_UNSET);
cond->set_condition_key(key);
}
Element_prc *list_el()
{
return this;
}
};
Protocol::Op& Protocol::snd_Expect_Open(api::Expectations &exp, bool reset)
{
Mysqlx::Expect::Open ex_open;
Expectation_builder builder(&ex_open);
exp.process(builder);
ex_open.set_op(reset ? Mysqlx::Expect::Open_CtxOperation_EXPECT_CTX_EMPTY :
Mysqlx::Expect::Open_CtxOperation_EXPECT_CTX_COPY_PREV);
return get_impl().snd_start(ex_open, msg_type::cli_ExpectOpen);
}
Protocol::Op& Protocol::snd_Expect_Close()
{
Mysqlx::Expect::Close ex_close;
return get_impl().snd_start(ex_close, msg_type::cli_ExpectClose);
}
class Rcv_auth_base : public Op_rcv
{
public:
Rcv_auth_base(Protocol_impl &proto) : Op_rcv(proto)
{}
void resume(Auth_processor &prc)
{
read_msg(prc);
}
Next_msg do_next_msg(msg_type_t type)
{
switch (type)
{
case msg_type::AuthenticateOk:
case msg_type::AuthenticateContinue:
return EXPECTED;
default: return UNEXPECTED;
}
}
template<class MSG, class PRC>
void process_msg_with(MSG&, PRC&)
{
// TODO: better error description (message/processor type info)
throw_error("Invalid processor used to process server reply");
}
};
template<>
void Rcv_auth_base::process_msg_with(Mysqlx::Session::AuthenticateOk &msg,
Auth_processor &prc)
{
bytes data((byte*)msg.auth_data().data(), msg.auth_data().length());
prc.auth_ok(data);
}
template<>
void Rcv_auth_base::process_msg_with(Mysqlx::Session::AuthenticateContinue &msg,
Auth_processor &prc)
{
bytes data((byte*)msg.auth_data().data(), msg.auth_data().length());
prc.auth_continue(data);
}
class Rcv_auth : public Message_dispatcher<Rcv_auth_base>
{
public:
Rcv_auth(Protocol_impl &impl) : Dispatcher(impl)
{}
void do_process_msg(msg_type_t type, Message &msg)
{
Dispatcher::process_msg_with(type, msg, *static_cast<Auth_processor*>(m_prc));
}
};
Protocol::Op& Protocol::rcv_AuthenticateReply(Auth_processor &prc)
{
return get_impl().rcv_start<Rcv_auth>(prc);
}
// Parsing and processing protocol notices.
template<>
void process_notice<notice_type::SessionStateChange>(
const bytes ¬ice,
SessionState_processor &prc
)
{
Mysqlx::Notice::SessionStateChanged msg;
if (!msg.ParseFromString(std::string(notice.begin(), notice.end())))
THROW("Could not parse notice payload");
#ifdef DEBUG_PROTOBUF
using std::cerr;
using std::endl;
cerr << endl;
cerr << "<--- Notice payload:" << endl;
cerr << msg.DebugString();
cerr << "<---" << endl << endl;
#endif
switch (msg.param())
{
case Mysqlx::Notice::SessionStateChanged::CLIENT_ID_ASSIGNED:
{
assert(msg.value_size() == 1 && msg.value(0).has_v_unsigned_int());
uint64_t id = msg.value(0).v_unsigned_int();
assert(id < std::numeric_limits<unsigned long>::max());
prc.client_id((unsigned long)id);
break;
}
case Mysqlx::Notice::SessionStateChanged::ACCOUNT_EXPIRED:
prc.account_expired();
break;
case Mysqlx::Notice::SessionStateChanged::CURRENT_SCHEMA:
assert(msg.value_size() == 1 && msg.value(0).has_v_string());
// NOTE: Assuming the reported schema name is in utf8 encoding
prc.current_schema(msg.value(0).v_string().value());
break;
case Mysqlx::Notice::SessionStateChanged::ROWS_AFFECTED:
assert(msg.value_size() == 1 && msg.value(0).has_v_unsigned_int());
prc.row_stats(prc.ROWS_AFFECTED, msg.value(0).v_unsigned_int());
break;
case Mysqlx::Notice::SessionStateChanged::ROWS_FOUND:
assert(msg.value_size() == 1 && msg.value(0).has_v_unsigned_int());
prc.row_stats(prc.ROWS_FOUND, msg.value(0).v_unsigned_int());
break;
case Mysqlx::Notice::SessionStateChanged::ROWS_MATCHED:
assert(msg.value_size() == 1 && msg.value(0).has_v_unsigned_int());
prc.row_stats(prc.ROWS_MATCHED, msg.value(0).v_unsigned_int());
break;
case Mysqlx::Notice::SessionStateChanged::GENERATED_INSERT_ID:
assert(msg.value_size() == 1 && msg.value(0).has_v_unsigned_int());
prc.last_insert_id(msg.value(0).v_unsigned_int());
break;
case Mysqlx::Notice::SessionStateChanged::TRX_COMMITTED:
prc.trx_event(prc.COMMIT);
break;
case Mysqlx::Notice::SessionStateChanged::TRX_ROLLEDBACK:
prc.trx_event(prc.ROLLBACK);
break;
case Mysqlx::Notice::SessionStateChanged::PRODUCED_MESSAGE:
default: break;
case Mysqlx::Notice::SessionStateChanged::GENERATED_DOCUMENT_IDS:
for (auto it = msg.value().begin();
it != msg.value().end();
++it)
{
prc.generated_document_id(it->v_octets().value());
}
break;
}
}
template<>
void process_notice<notice_type::Warning>(
const bytes ¬ice,
Error_processor &prc
)
{
Mysqlx::Notice::Warning msg;
if (!msg.ParseFromString(std::string(notice.begin(), notice.end())))
THROW("Could not parse notice payload");
#ifdef DEBUG_PROTOBUF
using std::cerr;
using std::endl;
cerr << endl;
cerr << "<--- Notice payload:" << endl;
cerr << msg.DebugString();
cerr << "<---" << endl << endl;
#endif
short int level;
switch (msg.level())
{
case Mysqlx::Notice::Warning::ERROR: level = 2; break;
case Mysqlx::Notice::Warning::WARNING: level = 1; break;
case Mysqlx::Notice::Warning::NOTE:
default:
level = 0; break;
}
prc.error(msg.code(), level, sql_state_t(), msg.msg());
}
// Server-side API
Protocol::Op& Protocol_server::snd_AuthenticateContinue(bytes data)
{
Mysqlx::Session::AuthenticateContinue auth_cont;
auth_cont.set_auth_data(data.begin(), data.size());
return get_impl().snd_start(auth_cont, msg_type::AuthenticateContinue);
}
Protocol::Op& Protocol_server::snd_AuthenticateOK(bytes data)
{
Mysqlx::Session::AuthenticateOk msg_auth_ok;
msg_auth_ok.set_auth_data(data.begin(), data.size());
return get_impl().snd_start(msg_auth_ok, msg_type::AuthenticateOk);
}
class Rcv_init_base : public Op_rcv
{
public:
Rcv_init_base(Protocol_impl &proto) : Op_rcv(proto)
{}
void resume(Init_processor &prc)
{
read_msg(prc);
}
Next_msg next_msg(msg_type_t type)
{
switch (type)
{
case msg_type::cli_AuthenticateStart:
case msg_type::cli_AuthenticateContinue:
return EXPECTED;
default: return UNEXPECTED;
}
}
template<class MSG, class PRC>
void process_msg_with(MSG&, PRC&)
{
// TODO: better error description (message/processor type info)
throw_error("Invalid processor used to process server reply");
}
};
template<>
void Rcv_init_base::process_msg_with(Mysqlx::Session::AuthenticateStart &msg,
Init_processor &ip)
{
bytes data((byte*)msg.auth_data().data(), msg.auth_data().length());
bytes response((byte*)msg.initial_response().data(), msg.initial_response().length());
ip.auth_start(msg.mech_name().c_str(), data, response);
}
template<>
void Rcv_init_base::process_msg_with(Mysqlx::Session::AuthenticateContinue &msg,
Init_processor &ip)
{
bytes data((byte*)msg.auth_data().data(), msg.auth_data().length());
ip.auth_continue(data);
}
class Rcv_init : public Message_dispatcher<Rcv_init_base>
{
public:
Rcv_init(Protocol_impl &impl) : Dispatcher(impl)
{}
void process_msg(msg_type_t type, Message &msg)
{
Dispatcher::process_msg_with(type, msg, *static_cast<Init_processor*>(m_prc));
}
};
Protocol::Op& Protocol_server::rcv_InitMessage(Init_processor &prc)
{
return get_impl().rcv_start<Rcv_init>(prc);
}
}}} // cdk::protocol::mysqlx