cdk/protocol/mysqlx/rset.cc (295 lines of code) (raw):
/*
* Copyright (c) 2015, 2024, Oracle and/or its affiliates.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License, version 2.0, as
* published by the Free Software Foundation.
*
* This program is designed to work with certain software (including
* but not limited to OpenSSL) that is licensed under separate terms, as
* designated in a particular file or component or in included license
* documentation. The authors of MySQL hereby grant you an additional
* permission to link the program and your derivative works with the
* separately licensed software that they have either included with
* the program or referenced in the documentation.
*
* Without limiting anything contained in the foregoing, this file,
* which is part of Connector/C++, is also subject to the
* Universal FOSS Exception, version 1.0, a copy of which can be found at
* https://oss.oracle.com/licenses/universal-foss-exception.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU General Public License, version 2.0, for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
/*
Implementation of mysqlx protocol API: result sets
==================================================
Class Rcv_result implements an asynchronous operation which reads server
reply after a query or command. It derives from Op_rcv using the general
message processing framework defined there.
*/
#include "protocol.h"
PUSH_PB_WARNINGS
#include "protobuf/mysqlx_sql.pb.h"
POP_PB_WARNINGS
using namespace cdk::foundation;
using namespace google::protobuf;
using namespace cdk::protocol::mysqlx;
namespace cdk {
namespace protocol {
namespace mysqlx {
/*
Base of asynchronous operation that processes server reply to a query
or statement.
Such reply is processed in several stages, each stage initiated by an
appropriate Protocol::rcv_XXX() call (and user is responsible for making
these calls in the correct order, see docs/???).
The Rcv_result class inherits from Op_rcv and implements a multi-stage
receive operation. It means that after creating the operation it signals
completion (via is_completed() method) at the end of the first processing
stage (such as reading meta-data). To continue processing server reply,
the operation should be resumed, after which it can be continued again until
the next stage is completed. If there are no more processing stages to
be done, method is_done() returns true (and then it is an error to resume
the operation). Thus, the usage pattern for Rcv_result operation is as
follows:
Rcv_result op(...);
// perform first stage of processing
while (!op.is_completed())
op.cont();
// first stage is completed, but more might be needed
if (!op.is_done())
{
// start next stage of processing
op.resume(...);
// execute it
while (!op.is_completed())
op.cont();
}
*/
class Rcv_result_base : public Op_rcv
{
public:
Rcv_result_base(Protocol_impl&);
void resume(Stmt_processor&);
void resume(Row_processor&);
void resume(Mdata_processor&);
protected:
/*
State tells at which stage the processing of the server reply is.
state MDATA { reading result-set meta-data (if any) }
state ROWS { reading result-set rows (if any) }
state CLOSE { reding closing StmtExecuteOk packet }
state DONE {
processing server reply is completed and there are no more
stages to be performed.
}
*/
enum result_state_t
{ START, MDATA, ROWS, CLOSE, DONE };
result_state_t m_result_state =START, m_next_state;
row_count_t m_rcount;
col_count_t m_ccount;
bool is_done() const
{
return DONE == m_result_state;
}
/*
These functions determine how to process each incoming message and
whether to continue reading the following messages after the current
one.
*/
Next_msg next_msg(msg_type_t);
Next_msg do_next_msg(msg_type_t);
bool process_next();
bool do_process_next();
/*
Dispatchers for different message and processor types.
If this method is not overridden in Rcv_result for a given <MSG,PRC>
pair, then it means a miss-match and an error is thrown.
*/
template<class MSG, class PRC>
void process_msg_with(MSG&, PRC&)
{
// TODO: better error description (message/processor type info)
throw_error("Invalid processor used to process server reply");
}
};
class Rcv_result : public Message_dispatcher<Rcv_result_base>
{
public:
Rcv_result(Protocol_impl &proto) : Dispatcher(proto)
{}
/*
These methods handle incoming, parsed messages. They pass them
to appropriate process_msg_with<MSG,PRC>() dispatchers.
*/
void process_msg(msg_type_t, Message&);
void do_process_msg(msg_type_t, Message&);
};
Rcv_result_base::Rcv_result_base(Protocol_impl &proto)
: Op_rcv(proto)
, m_result_state(START)
, m_rcount(0)
, m_ccount(0)
{
}
/*
Once created, Rcv_result operation is resumed after each completed
processing stage. This starts new processing stage which uses new
processor to report data from this stage. The processor type must
match the processing stage. If this is the case, new stage is started
by initiating reading of the next message (read_msg() method).
*/
void Rcv_result_base::resume(Mdata_processor &prc)
{
if (START != m_result_state && MDATA != m_result_state)
throw_error("Rcv_result: incorrect resume: attempt to read meta-data"); //TODO: Improve error report
m_ccount = 0;
m_completed = false;
read_msg(prc);
}
void Rcv_result_base::resume(Stmt_processor &prc)
{
if (CLOSE != m_result_state || !m_completed)
throw_error("Rcv_result: incorrect resume: attempt to read final OK"); //TODO: Improve error report
m_completed= false;
read_msg(prc);
}
void Rcv_result_base::resume(Row_processor &prc)
{
if (ROWS != m_result_state || !m_completed)
throw_error("Rcv_result: incorrect resume: attempt to read rows"); //TODO: Improve error report
// reset the row counter
m_rcount = 0;
m_completed= false;
read_msg(prc);
}
/*
Before processing each message, m_next_state is set to the current state
of the operation, and after processing each message, m_reslut_state is set
to value of m_next_state. This achieves two goals:
- member m_result_state keeps the current state during message processing
(as it is changed only *after* message is processed).
- if processing logic do not set m_next_state, the state remains unchanged.
To maintain the state, methods next_msg() and process_next() are overridden.
First one is called after reading message header but before processing its
payload and the second one is called after processing the payload.
*/
Op_rcv::Next_msg Rcv_result_base::next_msg(msg_type_t type)
{
m_next_state = m_result_state;
return Op_rcv::next_msg(type);
}
bool Rcv_result_base::process_next()
{
m_result_state = m_next_state;
return Op_rcv::process_next();
}
/*
Determine whether next received message is expected, whether it ends the
current processing stage and if yes, whether it will be processed in the
next stage.
This method is called after reading message header, but before reading and
processing its payload (See Rcv_op::???). It maintains the state-machine
for processing server reply. After looking at the current state the logic
of this method determines the following things:
- Whether message of this type is expected, if not UNEXPECTED is returned.
- Whether this message ends current processing stage, if this is the case
then m_completed flag is set.
- In case current stage is ended, whether this message should be considered
part of the ended stage or of the new stage. If it belongs to the ended
stage then EXPECTED is returned. Otherwise STOP is returned and the message
will be processod again when the new stage is started.
- What is the next state after processing this message, this is stored in
m_next_state.
Note: current state is changed to the new one only after processing message
payload (so that state information is correct while processing the payload).
However, if current message will be processed in the next stage (this
method returns STOP), then the current state iformation needs to be updated
here.
Structure of a server reply which is traced by the state machine.
<reply> ::= (<rset> <more>)? StmtExecuteOk
<more> ::= FetchDone
| FetchDoneMoreResultsets <rset>? <more>
<rset> ::= MetaData+ Row*
Below are few examples of valid message sequences in server reply and how
they are distrbuted between different processing stages:
A = reading meta-data, B = reading rows, C = reading final OK.
1. A:[MetaData ...] B:[Row ... FetchDone] C:[StmtExecuteOk]
2. A:[MetaData ... FetchDone] C:[StmtExecuteOk]
3. A:[] C:[StmtExecuteOk]
4. A:[MetaData ...] B:[Row ... FetchDoneMoreResultsets] A:[MetaData ...] ... C:[StmtExecuteOk]
5. A:[MetaData ...] B:[Row ... FetchDoneMoreResultsets] A:[FetchDone] C:[StmtExecuteOk]
Example 1 is a typical result set with rows. Example 2 is a result-set
without any rows in it. Example 3 is a server reply without a result-set
(such as after INSERT/UPDATE statement). Example 4 is a multi-result-set.
Example 5 shows a multi-result set with no result-set at the end (such
sequence can be sent after stored routine exectuion).
TODO: Handle result-set for stored routine output parameters when xplugin
supports it.
*/
Op_rcv::Next_msg Rcv_result_base::do_next_msg(msg_type_t type)
{
switch (m_result_state)
{
case START:
if (msg_type::Ok == type)
{
m_next_state = DONE;
m_completed = true;
return EXPECTED;
}
m_next_state = MDATA;
// fall through to MDATA case
case MDATA:
/*
In this state we expect (<rset>? <more>)? StmtExecuteOk sequence. It can
start with one of the following messages:
- ColumnMetaData - in this case we have a row-set (possibly empty), we
continue in MDATA state until we see a row message
or FetchDoneXXX which terminates empty row-set;
- Row - such message should appear only after reading some
ColumnMetaData ones and it starts the sequence of rows
from the row-set;
- StmtExecuteOK - this is the case where server reply contains no result
set;
- FetchDoneXXX - this is the case where there is no <rset> part but
possibly more result sets follow;
*/
switch (type)
{
case msg_type::ColumnMetaData: return EXPECTED;
/*
If we see Row message we move to ROWS state and will start next stage
(reading rows). The Row message belongs to the next stage.
*/
case msg_type::Row:
if (0 == m_ccount)
return UNEXPECTED;
m_next_state = ROWS;
break;
/*
If we see FetchDoneXXX then there are 2 cases:
1. If there was some meta-data info before (m_ccount > 0) then we have an
empty row-set without any rows in it. We will start row reading stage to
report 0 rows. This message will be part of the row reading stage.
2. If there was no meta-data info before (m_ccount == 0) then there is
no result set. We consume the current message as part of this stage, end
the meta-data stage and proceed to the next one (either CLOSE or MDATA
if another rset follows).
*/
case msg_type::FetchDone:
case msg_type::FetchDoneMoreResultsets:
if (0 == m_ccount)
m_next_state = msg_type::FetchDone == type ? CLOSE : MDATA;
else
m_next_state = ROWS;
break;
/*
If we see StmtExecuteOk then the meta-data processing stage ends and we
proceed to the final stage. The message will be part of the next stage.
*/
case msg_type::StmtExecuteOk:
if (0 < m_ccount)
return UNEXPECTED;
m_next_state = CLOSE;
break;
default: return UNEXPECTED;
};
// If we reached here, then the current stage is completed.
m_completed = true;
/*
Processing meta-data has ended now - report the column count to
the processor. Note that it can be 0 if no result-set (and thus
no meta-data) was present in the reply.
*/
static_cast<Mdata_processor*>(m_prc)->col_count(m_ccount);
/*
If there is no result-set (no meta-data info was seen) then we either
look at StmtExecuteOk, which shuld be processed by the next stage, or at
some FetchDoneXXX messages which are consumed as part of this stage
(and ignored).
*/
if (0 == m_ccount && msg_type::StmtExecuteOk != type)
return EXPECTED;
/*
Since we stop before processing message payload, we must update
state here as process_next() will not be called in this case.
*/
m_result_state = m_next_state;
return STOP;
case ROWS:
/*
In this state we expect Row* <more> sequence. It can start with one of
the following messages:
- Row - next row from the row-set, we continue reading rows until we
see <more>;
- FetchDoneXXX - these messages start <more> sequence; they are consumed
as part of this stage and the next stage starts.
*/
switch (type)
{
case msg_type::Row: return EXPECTED;
case msg_type::FetchDone:
m_next_state = CLOSE; // no more result-sets
break;
case msg_type::FetchDoneMoreResultsets:
m_next_state = MDATA; // proceed to next result-set
break;
default: return UNEXPECTED;
};
/*
This stage is completed and the Fetch* message will be the last one
processed during this stage.
*/
m_completed= true;
return EXPECTED;
case CLOSE:
/*
In this state the final StmtExecuteOk message is expected from the
server. After processing this message the current processing stage
is completed.
*/
m_completed = true;
m_next_state = DONE;
return msg_type::StmtExecuteOk == type ? EXPECTED : UNEXPECTED;
case DONE:
default:
// No message should be processed if state is not one of the above.
assert(false);
return UNEXPECTED;
}
}
/*
Determine whether to read next message after processing the current one.
We always want to see the next message unless current processing stage
is completed.
*/
bool Rcv_result_base::do_process_next()
{
return !m_completed;
}
}}}
/*
Implementation of Protocol methods using the internal implementation.
*/
namespace cdk {
namespace protocol {
namespace mysqlx {
Protocol::Op& Protocol::rcv_Rows(Row_processor &prc)
{
return get_impl().rcv_start<Rcv_result>(prc);
}
Protocol::Op& Protocol::rcv_MetaData(Mdata_processor &prc)
{
return get_impl().rcv_start<Rcv_result>(prc);
}
Protocol::Op& Protocol::rcv_StmtReply(Stmt_processor &prc)
{
return get_impl().rcv_start<Rcv_result>(prc);
}
}}} // cdk::protocol::mysqlx
namespace cdk {
namespace protocol {
namespace mysqlx {
/*
Process messages received from server in ROWS context
*/
template<>
void Rcv_result_base::process_msg_with(
Mysqlx::Resultset::FetchDoneMoreResultsets&, Row_processor &rp
)
{
/*
Inform the processor about finishing reading all rows from the
current result-set, but the server has another result-set.
Next messages will not be read automatically.
*/
rp.done(true, true);
}
template<>
void Rcv_result_base::process_msg_with(
Mysqlx::Resultset::FetchDone&, Row_processor &rp
)
{
/*
Fetching all rows from the cursor is finished.
No need to parse because there is no payload in this message.
Notify the processor.
*/
rp.done(true, false);
}
template<>
void Rcv_result_base::process_msg_with(
Mysqlx::Resultset::Row &row, Row_processor &rp
)
{
row_count_t rcount= m_rcount++;
if(!rp.row_begin(rcount))
return; // skip this row if the processor doesn't want it
col_count_t ccount = 0;
for (RepeatedPtrField< ::std::string>::const_iterator it = row.field().begin();
it != row.field().end(); ++it, ++ccount)
{
if (it->length() == 0)
{
rp.col_null(ccount);
continue;
}
size_t read_window = rp.col_begin(ccount, it->length());
size_t pos= 0;
while (it->length() > pos && read_window)
{
size_t bytes_to_feed = it->length() - pos > read_window ? read_window : it->length() - pos;
size_t read_window_new = rp.col_data(ccount, bytes((byte*)(it->c_str() + pos), bytes_to_feed));
pos += read_window;
read_window = read_window_new;
}
rp.col_end(ccount, it->length());
}
rp.row_end(rcount);
}
/*
Process column metadata
*/
template<>
void Rcv_result_base::process_msg_with(
Mysqlx::Resultset::ColumnMetaData &col_mdata, Mdata_processor &mdata_proc
)
{
col_count_t ccount= m_ccount++;
assert(col_mdata.type() < std::numeric_limits<unsigned short>::max());
mdata_proc.col_type(ccount, static_cast<unsigned short>(col_mdata.type()));
mdata_proc.col_name(ccount, col_mdata.name(),
col_mdata.has_original_name() ? col_mdata.original_name() : "");
if (col_mdata.has_table())
mdata_proc.col_table(ccount, col_mdata.table(),
col_mdata.has_original_table() ? col_mdata.original_table() : "");
if (col_mdata.has_schema())
mdata_proc.col_schema(ccount, col_mdata.schema(),
col_mdata.has_catalog() ? col_mdata.catalog() : "");
if (col_mdata.has_collation())
mdata_proc.col_collation(ccount, col_mdata.collation());
if (col_mdata.has_length())
mdata_proc.col_length(ccount, col_mdata.length());
if (col_mdata.has_fractional_digits())
{
assert(col_mdata.fractional_digits() < std::numeric_limits<unsigned short>::max());
mdata_proc.col_decimals(ccount,
static_cast<unsigned short>(col_mdata.fractional_digits())
);
}
if (col_mdata.has_content_type())
{
assert(col_mdata.content_type() < std::numeric_limits<unsigned short>::max());
mdata_proc.col_content_type(ccount,
static_cast<unsigned short>(col_mdata.content_type()));
}
if (col_mdata.has_flags())
mdata_proc.col_flags(ccount, col_mdata.flags());
}
template<>
void Rcv_result_base::process_msg_with(Mysqlx::Ok &ok, Mdata_processor &prc)
{
prc.ok(ok.msg());
}
template<>
void Rcv_result_base::process_msg_with(
Mysqlx::Sql::StmtExecuteOk&, Stmt_processor &prc
)
{
prc.execute_ok();
}
void Rcv_result::process_msg(msg_type_t type, Message &msg)
{
// Mark operation as completed if we see error message.
if (type == msg_type::Error)
{
m_next_state = DONE;
m_completed = true;
}
// Invoke the default message processing
Op_rcv::process_msg(type, msg);
}
void Rcv_result::do_process_msg(msg_type_t type, Message &msg)
{
assert(m_prc);
switch (m_result_state)
{
case START:
case MDATA:
Dispatcher::process_msg_with(type, msg, *static_cast<Mdata_processor*>(m_prc));
break;
case ROWS:
Dispatcher::process_msg_with(type, msg, *static_cast<Row_processor*>(m_prc));
break;
case CLOSE:
Dispatcher::process_msg_with(type, msg, *static_cast<Stmt_processor*>(m_prc));
break;
case DONE:
THROW("processing message in wrong state");
}
}
}}} // cdk::protocol::mysqlx