mysqlshdk/libs/db/replay/trace.cc (652 lines of code) (raw):

/* * Copyright (c) 2017, 2024, Oracle and/or its affiliates. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License, version 2.0, * as published by the Free Software Foundation. * * This program is designed to work with certain software (including * but not limited to OpenSSL) that is licensed under separate terms, * as designated in a particular file or component or in included license * documentation. The authors of MySQL hereby grant you an additional * permission to link the program and your derivative works with the * separately licensed software that they have either included with * the program or referenced in the documentation. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See * the GNU General Public License, version 2.0, for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include "mysqlshdk/libs/db/replay/trace.h" #include <rapidjson/document.h> #include <rapidjson/error/en.h> #include <rapidjson/filereadstream.h> #include <rapidjson/filewritestream.h> #include <rapidjson/rapidjson.h> #include <rapidjson/reader.h> #include <rapidjson/stringbuffer.h> #include <rapidjson/writer.h> #include <utility> #include "mysqlshdk/libs/db/replay/replayer.h" #include "mysqlshdk/libs/utils/debug.h" #include "mysqlshdk/libs/utils/utils_file.h" #include "mysqlshdk/libs/utils/utils_path.h" #include "mysqlshdk/libs/utils/utils_stacktrace.h" #include "mysqlshdk/libs/utils/utils_string.h" // TODO(alfredo) warnings, multi-results, mysqlx namespace mysqlshdk { namespace db { namespace replay { sequence_error::sequence_error(const std::string &what) : db::Error(what.c_str(), 9999) { std::cerr << "SESSION REPLAY ERROR: " << what << "\n"; mysqlshdk::utils::print_stacktrace(); if (getenv("TEST_DEBUG")) assert(0); } template <typename T> void set(rapidjson::Document *doc, const char *key, T value) { rapidjson::Value v(value); rapidjson::Value k(key, doc->GetAllocator()); doc->AddMember(k, v, doc->GetAllocator()); } void set(rapidjson::Document *doc, const char *key, const char *value) { rapidjson::Value v; rapidjson::Value k(key, doc->GetAllocator()); if (!value) { v.SetNull(); } else { v.SetString(value, doc->GetAllocator()); } doc->AddMember(k, v, doc->GetAllocator()); } void set(rapidjson::Document *doc, const char *key, const std::string &value) { rapidjson::Value v(value.c_str(), doc->GetAllocator()); rapidjson::Value k(key, doc->GetAllocator()); doc->AddMember(k, v, doc->GetAllocator()); } template <typename T> void set(rapidjson::Document *doc, rapidjson::Value *obj, const char *key, T value) { rapidjson::Value v(value); rapidjson::Value k(key, doc->GetAllocator()); obj->AddMember(k, v, doc->GetAllocator()); } void set(rapidjson::Document *doc, rapidjson::Value *obj, const char *key, const std::string &value) { rapidjson::Value v(value.c_str(), doc->GetAllocator()); rapidjson::Value k(key, doc->GetAllocator()); obj->AddMember(k, v, doc->GetAllocator()); } std::string to_json(rapidjson::Value *value) { rapidjson::Document doc; doc.CopyFrom(*value, doc.GetAllocator()); rapidjson::StringBuffer buffer; rapidjson::Writer<rapidjson::StringBuffer> writer(buffer); doc.Accept(writer); return buffer.GetString(); } std::string get_string(rapidjson::Value &value) { // NOLINT if (!value.IsString()) { throw std::logic_error("Bad JSON data. String expected, got " + to_json(&value)); } return value.GetString(); } int64_t get_int(rapidjson::Value &value) { // NOLINT if (!value.IsInt64()) { throw std::logic_error("Bad JSON data. Int expected, got " + to_json(&value)); } return value.GetInt64(); } uint64_t get_uint(rapidjson::Value &value) { // NOLINT if (!value.IsUint64()) { throw std::logic_error("Bad JSON data. Uint expected, got " + to_json(&value)); } return value.GetUint64(); } std::string make_json( const std::string &type, const std::string &subtype, const std::vector<std::pair<std::string, std::string>> &items, int i) { rapidjson::Document doc; doc.SetObject(); set(&doc, "type", type); set(&doc, "subtype", subtype); set(&doc, "index", i); for (const auto &item : items) { set(&doc, item.first.c_str(), item.second); } rapidjson::StringBuffer buffer; rapidjson::Writer<rapidjson::StringBuffer> writer(buffer); doc.Accept(writer); return buffer.GetString(); } void Trace_writer::serialize_connect( const mysqlshdk::db::Connection_options &data, const std::string &protocol) { _stream << make_json("request", "CONNECT", {{"uri", data.as_uri(uri::formats::full())}, {"protocol", protocol}}, ++_idx) << ",\n"; _log_label = shcore::path::basename(_path); auto ext = _log_label.rfind('.'); if (ext != std::string::npos) _log_label = _log_label.substr(0, ext); if (data.has_host()) { _log_label.append("/").append(data.get_host()); if (data.has_port()) _log_label.append(":").append( std::to_string(data.has_port() ? data.get_port() : 3306)); } else { _log_label.append(data.get_socket()); } DBUG_LOG("sql", _log_label << ": connect " << data.as_uri(uri::formats::full())); } void Trace_writer::serialize_close() { DBUG_LOG("sql", _log_label << ": close"); _stream << make_json("request", "CLOSE", {}, ++_idx) << ",\n"; } void Trace_writer::serialize_query(const std::string &sql) { DBUG_LOG("sqlall", _log_label << ": " << sql); _stream << make_json("request", "QUERY", {{"sql", sql}}, ++_idx) << ",\n"; } void Trace_writer::serialize_ok() { _stream << make_json("response", "OK", {}, ++_idx) << ",\n"; } void Trace_writer::serialize_connect_ok( const std::map<std::string, std::string> &info) { rapidjson::Document doc; doc.SetObject(); set(&doc, "type", "response"); set(&doc, "subtype", "CONNECT_OK"); set(&doc, "index", ++_idx); for (const auto &it : info) { set(&doc, it.first.c_str(), it.second.c_str()); } rapidjson::StringBuffer buffer; rapidjson::Writer<rapidjson::StringBuffer> writer(buffer); doc.Accept(writer); _stream << buffer.GetString() << ",\n"; } void serialize_result_metadata(rapidjson::Document *doc, std::shared_ptr<db::IResult> result) { rapidjson::Value clist; clist.SetArray(); for (const auto &column : result->get_metadata()) { rapidjson::Value cobj; cobj.SetObject(); set(doc, &cobj, "schema", column.get_schema()); set(doc, &cobj, "table_name", column.get_table_name()); set(doc, &cobj, "table_label", column.get_table_label()); set(doc, &cobj, "column_name", column.get_column_name()); set(doc, &cobj, "column_label", column.get_column_label()); set(doc, &cobj, "length", column.get_length()); set(doc, &cobj, "fractional", column.get_fractional()); set(doc, &cobj, "type", to_string(column.get_type())); set(doc, &cobj, "collation", column.get_collation_name()); set(doc, &cobj, "charset", column.get_charset_name()); set(doc, &cobj, "collation_id", column.get_collation()); set(doc, &cobj, "unsigned", column.is_unsigned()); set(doc, &cobj, "zerofill", column.is_zerofill()); set(doc, &cobj, "binary", column.is_binary()); clist.PushBack(cobj, doc->GetAllocator()); } doc->AddMember("columns", clist, doc->GetAllocator()); } void push_field( rapidjson::Document *doc, rapidjson::Value *array, const db::IRow *row, uint32_t i, const std::function<std::string(const std::string &value)> &hook) { rapidjson::Value value; if (row->is_null(i)) { value.SetNull(); } else { switch (row->get_type(i)) { case Type::Null: value.SetNull(); break; case Type::Integer: value = row->get_int(i); break; case Type::UInteger: value = row->get_uint(i); break; case Type::Float: value = row->get_float(i); break; case Type::Double: value = row->get_double(i); break; case Type::Decimal: value.SetString(row->get_as_string(i).c_str(), doc->GetAllocator()); break; case Type::Bit: value = std::get<0>(row->get_bit(i)); break; case Type::String: case Type::Bytes: case Type::Geometry: case Type::Json: case Type::Date: case Type::Time: case Type::DateTime: case Type::Enum: case Type::Set: if (hook) { value.SetString(hook(row->get_as_string(i)).c_str(), doc->GetAllocator()); } else { value.SetString(row->get_as_string(i).c_str(), doc->GetAllocator()); } break; } } array->PushBack(value, doc->GetAllocator()); } bool is_set_as_string(Type type) { switch (type) { case Type::String: case Type::Bytes: case Type::Geometry: case Type::Json: case Type::Date: case Type::Time: case Type::DateTime: case Type::Enum: case Type::Set: return true; default: return false; } } void serialize_result_rows( rapidjson::Document *doc, std::shared_ptr<db::IResult> result, const std::function<std::string(const std::string &value)> &hook) { rapidjson::Value rlist; rlist.SetArray(); const db::IRow *row = result->fetch_one(); while (row) { rapidjson::Value fields; fields.SetArray(); for (uint32_t i = 0; i < row->num_fields(); i++) { push_field(doc, &fields, row, i, hook); } rlist.PushBack(fields, doc->GetAllocator()); row = result->fetch_one(); } doc->AddMember("rows", rlist, doc->GetAllocator()); } void Trace_writer::serialize_result( std::shared_ptr<db::IResult> result, const std::function<std::string(const std::string &value)> &hook) { try { rapidjson::Document doc; doc.SetObject(); set(&doc, "type", "response"); set(&doc, "subtype", "RESULT"); set(&doc, "index", ++_idx); set(&doc, "auto_increment_value", result->get_auto_increment_value()); set(&doc, "affected_rows", result->get_affected_row_count()); set(&doc, "warning_count", result->get_warning_count()); set(&doc, "info", result->get_info()); // Recording of gtids is not mandatory, i.e. is done only when they // are available, and it only occurs for Classic Sessions at the moment try { std::vector<std::string> gtids = result->get_gtids(); if (!gtids.empty()) { set(&doc, "gtids", shcore::str_join(gtids, ",")); } } catch (const std::logic_error &error) { // NO-OP: for 'not implemented' on the x protocol std::string msg(error.what()); if (msg != "not implemented") throw; } if (result->has_resultset()) { serialize_result_metadata(&doc, result); serialize_result_rows(&doc, result, hook); } rapidjson::StringBuffer buffer; rapidjson::Writer<rapidjson::StringBuffer> writer(buffer); doc.Accept(writer); _stream << buffer.GetString() << ",\n"; } catch (const std::exception &e) { std::cerr << "Exception serializing result trace: " << e.what() << "\n"; throw; } } void Trace_writer::serialize_error(const db::Error &e) { DBUG_LOG("sql", _log_label << ": MySQL error: " << e.what() << " (" << e.code()); _stream << make_json("response", "ERROR", {{"code", std::to_string(e.code())}, {"msg", e.what()}, {"sqlstate", e.sqlstate()}}, ++_idx) << ",\n"; } void Trace_writer::serialize_error(const std::runtime_error &e) { DBUG_LOG("sql", "Runtime error in " << _path << ": " << e.what()); _stream << make_json("response", "ERROR", {{"code", ""}, {"msg", e.what()}, {"sqlstate", ""}}, ++_idx) << ",\n"; } std::unique_ptr<Trace_writer> Trace_writer::create(const std::string &path) { return std::unique_ptr<Trace_writer>{new Trace_writer(path)}; } void Trace_writer::set_metadata( const std::map<std::string, std::string> &meta) { rapidjson::Document doc; doc.SetObject(); rapidjson::Value value; value.SetObject(); for (const auto &i : meta) { rapidjson::Value k(i.first.c_str(), doc.GetAllocator()); rapidjson::Value v(i.second.c_str(), doc.GetAllocator()); value.AddMember(k, v, doc.GetAllocator()); } doc.AddMember("metadata", value, doc.GetAllocator()); rapidjson::StringBuffer buffer; rapidjson::Writer<rapidjson::StringBuffer> writer(buffer); doc.Accept(writer); _stream << buffer.GetString() << ",\n"; } Trace_writer::Trace_writer(const std::string &path) : _path(path) { _log_label = shcore::path::basename(path); DBUG_LOG("sql", "Creating trace file " << path); _stream.open(path); if (_stream.bad()) throw std::logic_error(path + ": " + strerror(errno)); _stream.rdbuf()->pubsetbuf(0, 0); _stream << "[\n"; } Trace_writer::~Trace_writer() { _stream << "null]\n"; DBUG_LOG("sql", "Closed trace file " << _path << " (" << _idx << " entries)"); } // ------------------------------------------------ Trace::Trace(const std::string &path) : _trace_path(path) { std::FILE *file; char buffer[1024 * 4]; DBUG_LOG("sql", "Opening trace file " << path); file = std::fopen(path.c_str(), "r"); if (!file) throw std::logic_error(path + ": " + strerror(errno)); _index = 0; rapidjson::FileReadStream stream(file, buffer, sizeof(buffer)); _doc.ParseStream(stream); std::fclose(file); if (_doc.HasParseError()) { std::cerr << "REPLAY ERROR: Error parsing trace file " << path << ":" << _doc.GetErrorOffset() << ":" << rapidjson::GetParseError_En(_doc.GetParseError()) << "\n"; throw std::logic_error( shcore::str_format("Error parsing trace file %s:%zu:%s", path.c_str(), _doc.GetErrorOffset(), rapidjson::GetParseError_En(_doc.GetParseError()))); } assert(_doc.IsArray()); } Trace::~Trace() = default; void Trace::next(rapidjson::Value *entry) { if (_index >= _doc.Size() - 1) throw sequence_error("Session trace is over"); *entry = _doc[_index++]; if (0) { std::cerr << "Trace read: " << to_json(entry) << "\n"; } } std::map<std::string, std::string> Trace::get_metadata() { return {}; } void Trace::expect_request(rapidjson::Value *doc, const char *subtype, const char *detail) { if (_got_error) throw sequence_error("Session " + _trace_path + " is invalidated"); if (strcmp((*doc)["type"].GetString(), "request") != 0) { _got_error = true; throw sequence_error(("Attempted a request in replayed session " + _trace_path + ", but got something else: " + to_json(doc)) .c_str()); } if (strcmp((*doc)["subtype"].GetString(), subtype) != 0) { _got_error = true; if (detail) throw sequence_error( shcore::str_format( "Attempting '%s' (%s) but replayed session %s has %s", subtype, detail, _trace_path.c_str(), to_json(doc).c_str()) .c_str()); else throw sequence_error( shcore::str_format("Attempting '%s' but replayed session %s has %s", subtype, _trace_path.c_str(), to_json(doc).c_str()) .c_str()); } _last_request = to_json(doc); } mysqlshdk::db::Connection_options Trace::expected_connect() { rapidjson::Value obj; next(&obj); expect_request(&obj, "CONNECT"); DBUG_LOG("sqlall", shcore::path::basename(_trace_path) << ": connect: " << obj["uri"].GetString()); return mysqlshdk::db::Connection_options(obj["uri"].GetString()); } void Trace::expected_close() { rapidjson::Value obj; next(&obj); expect_request(&obj, "CLOSE"); } std::string Trace::expected_query(const std::string &expected) { rapidjson::Value obj; next(&obj); expect_request(&obj, "QUERY", expected.c_str()); std::string query = obj["sql"].GetString(); DBUG_LOG("sqlall", shcore::path::basename(_trace_path) << ": " << query); return query; } void throw_error(rapidjson::Value *doc) { const char *subtype = (*doc)["subtype"].GetString(); if (strcmp(subtype, "ERROR") == 0) { throw db::Error((*doc)["msg"].GetString(), std::atoi((*doc)["code"].GetString()), (*doc)["sqlstate"].GetString()); } else { throw std::logic_error( shcore::str_format("Unexpected entry in replayed session %s", to_json(doc).c_str()) .c_str()); } } void Trace::expected_status() { rapidjson::Value obj; next(&obj); if (strcmp(obj["type"].GetString(), "response") != 0) throw sequence_error(shcore::str_format( "Expected OK response in session trace, but got something else: %s", to_json(&obj).c_str())); const char *subtype = obj["subtype"].GetString(); if (strcmp(subtype, "OK") == 0) { } else { throw_error(&obj); } } void Trace::expected_connect_status( std::map<std::string, std::string> *out_info) { rapidjson::Value obj; next(&obj); if (strcmp(obj["type"].GetString(), "response") != 0) throw sequence_error(shcore::str_format( "Expected CONNECT_OK in session trace, but got something else: %s", to_json(&obj).c_str())); const char *subtype = obj["subtype"].GetString(); if (strcmp(subtype, "CONNECT_OK") == 0) { for (auto itr = obj.MemberBegin(); itr != obj.MemberEnd(); ++itr) { if (itr->value.IsString()) (*out_info)[itr->name.GetString()] = itr->value.GetString(); } } else { throw_error(&obj); } } void unserialize_result_metadata(rapidjson::Value *clist, std::vector<Column> *metadata) { for (unsigned i = 0; i < clist->Size(); i++) { rapidjson::Value &cobj((*clist)[i]); db::Column column( "", get_string(cobj["schema"]), get_string(cobj["table_name"]), get_string(cobj["table_label"]), get_string(cobj["column_name"]), get_string(cobj["column_label"]), get_int(cobj["length"]), get_int(cobj["fractional"]), db::string_to_type(get_string(cobj["type"])), get_int(cobj["collation_id"]), cobj["unsigned"].GetBool(), cobj["zerofill"].GetBool(), cobj["binary"].GetBool()); metadata->push_back(column); } } class Row_unserializer : public db::IRow { rapidjson::Value &_fields; const std::vector<db::Column> &_columns; public: Row_unserializer(rapidjson::Value &fields, // NOLINT(runtime/references) const std::vector<db::Column> &columns) : _fields(fields), _columns(columns) {} uint32_t num_fields() const override { return _columns.size(); } Type get_type(uint32_t index) const override { return _columns[index].get_type(); } bool is_null(uint32_t index) const override { return _fields[index].IsNull(); } std::string get_as_string(uint32_t index) const override { if (_fields[index].IsString()) return _fields[index].GetString(); return to_json(&_fields[index]); } std::string get_string(uint32_t index) const override { return replay::get_string(_fields[index]); } int64_t get_int(uint32_t index) const override { return replay::get_int(_fields[index]); } uint64_t get_uint(uint32_t index) const override { return _fields[index].GetUint64(); } float get_float(uint32_t index) const override { return _fields[index].GetDouble(); } double get_double(uint32_t index) const override { return _fields[index].GetDouble(); } std::pair<const char *, size_t> get_string_data(uint32_t) const override { throw std::logic_error("not implemented"); } void get_raw_data(uint32_t, const char **, size_t *) const override { throw std::logic_error("not implemented"); } std::tuple<uint64_t, int> get_bit(uint32_t index) const override { return {replay::get_int(_fields[index]), 0}; } }; void Trace::unserialize_result_rows( rapidjson::Value *rlist, std::shared_ptr<Result_mysql> result, std::function<std::unique_ptr<IRow>(std::unique_ptr<IRow>)> intercept) { for (unsigned i = 0; i < rlist->Size(); i++) { if (intercept) { std::unique_ptr<IRow> row_copier(intercept(std::unique_ptr<IRow>{ new Row_unserializer((*rlist)[i], result->_metadata)})); result->_rows.emplace_back(*row_copier); } else { result->_rows.emplace_back( Row_unserializer((*rlist)[i], result->_metadata)); } } } void Trace::unserialize_result_rows( rapidjson::Value *rlist, std::shared_ptr<Result_mysqlx> result, std::function<std::unique_ptr<IRow>(std::unique_ptr<IRow>)> intercept) { for (unsigned i = 0; i < rlist->Size(); i++) { if (intercept) { std::unique_ptr<IRow> row_copier(intercept(std::unique_ptr<IRow>{ new Row_unserializer((*rlist)[i], result->_metadata)})); result->_rows.emplace_back(*row_copier); } else { result->_rows.emplace_back( Row_unserializer((*rlist)[i], result->_metadata)); } } } std::shared_ptr<Result_mysql> Trace::expected_result( std::function<std::unique_ptr<IRow>(std::unique_ptr<IRow>)> intercept) { rapidjson::Value obj; next(&obj); if (strcmp(obj["type"].GetString(), "response") != 0) { throw sequence_error(shcore::str_format( "Expected RESULT for %s in session trace, but got something else: %s", _last_request.c_str(), to_json(&obj).c_str())); } const char *subtype = obj["subtype"].GetString(); if (strcmp(subtype, "RESULT") == 0) { auto last_insert_id = get_int(obj["auto_increment_value"]); auto affected_rows = get_uint(obj["affected_rows"]); auto warning_count = get_int(obj["warning_count"]); auto info = get_string(obj["info"]); std::string gtids = obj.HasMember("gtids") ? get_string(obj["gtids"]) : ""; std::shared_ptr<Result_mysql> result(new Result_mysql( affected_rows, warning_count, last_insert_id, info.c_str(), false, gtids.empty() ? std::vector<std::string>() : shcore::str_split(gtids, ","))); if (obj.HasMember("columns")) { result->_has_resultset = true; rapidjson::Value &cols(obj["columns"]); unserialize_result_metadata(&cols, &result->_metadata); } if (obj.HasMember("rows")) { rapidjson::Value &rows(obj["rows"]); unserialize_result_rows(&rows, result, intercept); } return result; } else { throw_error(&obj); } return {}; } std::shared_ptr<Result_mysqlx> Trace::expected_result_x( std::function<std::unique_ptr<IRow>(std::unique_ptr<IRow>)> intercept) { rapidjson::Value obj; next(&obj); if (strcmp(obj["type"].GetString(), "response") != 0) throw sequence_error(shcore::str_format( "Expected RESULT in session trace, but got something else: %s", to_json(&obj).c_str())); const char *subtype = obj["subtype"].GetString(); if (strcmp(subtype, "RESULT") == 0) { auto last_insert_id = get_int(obj["auto_increment_value"]); auto affected_rows = get_uint(obj["affected_rows"]); auto warning_count = get_int(obj["warning_count"]); auto info = get_string(obj["info"]); std::shared_ptr<Result_mysqlx> result(new Result_mysqlx( affected_rows, warning_count, last_insert_id, info.c_str())); if (obj.HasMember("columns")) { result->_has_resultset = true; rapidjson::Value &cols(obj["columns"]); unserialize_result_metadata(&cols, &result->_metadata); } if (obj.HasMember("rows")) { rapidjson::Value &rows(obj["rows"]); unserialize_result_rows(&rows, result, intercept); } return result; } else { throw_error(&obj); } return {}; } //-------------- void save_info(const std::string &path, const std::map<std::string, std::string> &state) { rapidjson::Document doc; doc.SetObject(); for (const auto &m : state) { set(&doc, m.first.c_str(), m.second.c_str()); } std::FILE *file; char buffer[1024 * 4]; file = std::fopen(path.c_str(), "w+"); if (!file) throw std::runtime_error(path + ": " + strerror(errno)); rapidjson::FileWriteStream stream(file, buffer, sizeof(buffer)); rapidjson::Writer<rapidjson::FileWriteStream> writer(stream); doc.Accept(writer); std::fclose(file); } std::map<std::string, std::string> load_info(const std::string &path) { std::FILE *file; char buffer[1024 * 4]; file = std::fopen(path.c_str(), "r"); if (!file) throw std::runtime_error(path + ": " + strerror(errno)); rapidjson::Document doc; rapidjson::FileReadStream stream(file, buffer, sizeof(buffer)); doc.ParseStream(stream); std::fclose(file); std::map<std::string, std::string> map; for (auto itr = doc.MemberBegin(); itr != doc.MemberEnd(); ++itr) { map[itr->name.GetString()] = itr->value.GetString(); } return map; } } // namespace replay } // namespace db } // namespace mysqlshdk