common/result.cc (191 lines of code) (raw):
/*
* Copyright (c) 2015, 2024, Oracle and/or its affiliates.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License, version 2.0, as
* published by the Free Software Foundation.
*
* This program is designed to work with certain software (including
* but not limited to OpenSSL) that is licensed under separate terms, as
* designated in a particular file or component or in included license
* documentation. The authors of MySQL hereby grant you an additional
* permission to link the program and your derivative works with the
* separately licensed software that they have either included with
* the program or referenced in the documentation.
*
* Without limiting anything contained in the foregoing, this file,
* which is part of Connector/C++, is also subject to the
* Universal FOSS Exception, version 1.0, a copy of which can be found at
* https://oss.oracle.com/licenses/universal-foss-exception.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU General Public License, version 2.0, for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include <mysql/cdk.h>
#include "result.h"
#include "session.h"
#include <vector>
#include <sstream>
#include <iomanip>
#include <cctype>
/*
Implementation of result and row objects and conversion of raw bytes
into values.
*/
using namespace ::mysqlx::impl::common;
/*
Decoding raw bytes into values
==============================
Overloads of convert() defined below handle conversion of raw representation
of values of different CDK types into Value object. A format descriptor is
used to learn about the raw encoding format and perform conversion to the
correct type using a corresponding codec object.
*/
Value
mysqlx::impl::common::
convert(cdk::bytes data, Format_descr<cdk::TYPE_STRING> &fd)
{
/*
String encoding has artificial 0x00 byte appended at the end to
distinguish the empty string from the null value. We skip
the trailing 0x00 byte to get just the raw bytes that encode the string.
*/
cdk::bytes raw(data.begin(), data.end() - 1);
// If this string value is in fact a SET, then return it as raw bytes.
if (fd.m_format.is_set())
return { raw.begin(), raw.size() };
auto &codec = fd.m_codec;
cdk::string str;
codec.from_bytes(raw, str);
return (std::u16string)str;
}
Value
mysqlx::impl::common::
convert(cdk::bytes data, Format_descr<cdk::TYPE_INTEGER> &fd)
{
auto &codec = fd.m_codec;
auto &fmt = fd.m_format;
if (fmt.is_unsigned())
{
uint64_t val;
codec.from_bytes(data, val);
return Value(val);
}
else
{
int64_t val;
codec.from_bytes(data, val);
return Value(val);
}
}
Value
mysqlx::impl::common::
convert(cdk::bytes data, Format_descr<cdk::TYPE_FLOAT> &fd)
{
auto &fmt = fd.m_format;
if (fmt.FLOAT == fmt.type())
{
float val;
fd.m_codec.from_bytes(data, val);
return Value(val);
}
// For other formats (DOUBLE, DECIMAL), try storing in double
// TODO: exact representation for DECIMAL values?
{
double val;
fd.m_codec.from_bytes(data, val);
return Value(val);
}
}
Value
mysqlx::impl::common::
convert(cdk::bytes data, Format_descr<cdk::TYPE_DOCUMENT>&)
{
if (0 == data.size())
return Value();
/*
Note: Here we do not look into format description and blindly assume
that document is represented as a JSON string.
Otherwise, implementation that would not assume what underlying
representation is used for documnets should use a Codec to decode
the raw bytes and build a representation of the documnent to be
stored in the Value instance.
*/
// trim initial space
unsigned i;
for (i = 0; i < data.size() && std::isspace(*(data.begin() + i)); ++i);
std::string json(data.begin() + i, data.end()-1);
return Value::Access::mk_json(json);
}
Value
mysqlx::impl::common::
convert(cdk::foundation::bytes data, Format_descr<cdk::TYPE_DATETIME> &)
{
return{ data.begin(), data.size()};
}
/*
Result implementation
=====================
*/
Result_impl::Result_impl(Result_init &init)
: m_sess(init.get_session()), m_reply(init.get_reply())
{
auto lock = m_sess->lock();
m_sess->register_result(this);
init.init_result(*this);
}
Result_impl::~Result_impl()
{
auto lock = m_sess->lock();
try {
if (m_sess) m_sess->deregister_result(this);
} catch (...) {
}
// Note: Cursor must be deleted before reply.
delete m_cursor;
delete m_reply;
}
bool Result_impl::next_result() {
auto lock = m_sess->lock();
pop_row_cache();
if (!m_result_cache.empty()) return true;
// Nothing in cache... jump to next resultset and read it
return read_next_result();
}
bool Result_impl::read_next_result()
{
/*
Note: closing cursor discards previous rset. Only then
we can move to the next rset (if any).
*/
auto lock = m_sess->lock();
if (m_pending_rows)
{
assert(m_cursor);
m_cursor->close();
}
// Prepare for reading (next) result
delete m_cursor;
m_cursor = nullptr;
m_pending_rows = false;
m_inited = true;
if (!m_reply)
return false;
if (!m_reply->has_results()) {
if (0 < m_reply->entry_count())
m_reply->get_error().rethrow();
m_sess->deregister_result(this);
return false;
}
// Result has row data - create cursor to access it
m_cursor = new cdk::Cursor(*m_reply);
// Wait for cursor to fetch result meta-data and copy it to local storage.
m_cursor->wait();
m_pending_rows = true;
//Push new row cache
push_row_cache();
return true;
}
void Result_impl::push_row_cache() {
auto lock = m_sess->lock();
m_result_mdata.push(Shared_meta_data(new Meta_data(*m_cursor )));
m_result_cache.push(Row_cache());
m_result_cache_size.push(0);
}
const Row_data* Result_impl::get_row()
{
auto lock = m_sess->lock();
// TODO: Session parameter for cache prefetch size
load_cache(16);
if (m_result_cache.empty() || m_result_cache.front().empty())
{
if (m_reply->entry_count() > 0)
m_reply->get_error().rethrow();
return nullptr;
}
m_row = m_result_cache.front().front();
m_result_cache.front().pop_front();
m_result_cache_size.front()--;
return &m_row;
}
/*
Returns true if there are some rows in the cache after returning from
the call. If cache is empty when this method is called, it loads
prefetch_size rows into the cache. If prefetch_size is 0, it loads
all remaining rows into the cache (even if cache currently contains some
rows).
It caches elements to the last queue element, since more resultsets could have
been cached before.
*/
bool Result_impl::load_cache(row_count_t prefetch_size)
{
auto lock = m_sess->lock();
if (!m_inited)
next_result();
if(m_result_cache.empty())
return false;
if (!m_result_cache.back().empty() && 0 != prefetch_size)
return true;
if (!m_pending_rows)
return false;
/*
Note: if cache is not empty then m_cache_it correctly points at the last
element in the cache.
*/
if (m_result_cache.back().empty())
m_cache_it = m_result_cache.back().before_begin();
// Initiate row reading operation
if (0 < prefetch_size)
m_cursor->get_rows(*this, prefetch_size);
else
m_cursor->get_rows(*this); // this reads all remaining rows
// Wait for it to complete
m_cursor->wait();
/*
Cleanup after reading all rows: close the cursor if whole rset has
been consumed (or error happend, in which case server won't sent more
data).
*/
if (!m_pending_rows || m_reply->entry_count() > 0)
{
m_cursor->close();
m_pending_rows = false;
/*
If there are no more rsets in the reply, deregister the result so that
session is unlocked for the next command.
*/
if (m_reply->end_of_reply())
m_sess->deregister_result(this);
}
return !m_result_cache.back().empty();
}
// Row_processor interface implementation
size_t Result_impl::field_begin(col_count_t pos, size_t size)
{
//m_row.insert(std::pair<col_count_t, Buffer>(pos, Buffer()));
m_row.emplace(pos, Buffer());
// FIX
return size;
}
size_t Result_impl::field_data(col_count_t pos, bytes data)
{
m_row[(unsigned)pos].append(data);
// FIX
return data.size();
}
void Result_impl::row_end(row_count_t)
{
if (!m_row_filter(m_row))
return;
m_cache_it = m_result_cache.back().emplace_after(m_cache_it, std::move(m_row));
m_result_cache_size.back()++;
}
void Result_impl::end_of_data()
{
m_pending_rows = false;
}