plugins/migration/copytable/copytable.h (341 lines of code) (raw):

/* * Copyright (c) 2012, 2018, Oracle and/or its affiliates. All rights reserved. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License, version 2.0, * as published by the Free Software Foundation. * * This program is designed to work with certain software (including * but not limited to OpenSSL) that is licensed under separate terms, as * designated in a particular file or component or in included license * documentation. The authors of MySQL hereby grant you an additional * permission to link the program and your derivative works with the * separately licensed software that they have either included with * the program or referenced in the documentation. * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See * the GNU General Public License, version 2.0, for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #pragma once #ifndef _MSC_VER #include <sql.h> #include <sqlext.h> #include <errno.h> #include <stdlib.h> #include <vector> #include <set> #include <map> #include <string> #include <stdexcept> #include <memory> #include <functional> #ifdef __APPLE #pragma GCC diagnostic ignored "-Wdeprecated-register" #endif #endif #include "converter.h" #include "glib.h" #include "base/threading.h" class QueryBuilder { public: void select_columns(const std::string &columns) { _columns = columns; }; void select_from_table(const std::string &table, const std::string &schema = "") { _table = table; _schema = schema; }; void add_limit(const std::string &limit) { _limit = limit; }; void add_orderby(const std::string &orderby) { _orderby = orderby; }; void add_where(const std::string &where) { _where.push_back(where); }; std::string build_query(); private: std::string _orderby; std::string _limit; std::string _schema; std::string _table; std::string _columns; std::vector<std::string> _where; }; class ConnectionError : public std::runtime_error { static std::string process(SQLRETURN retcode, SQLSMALLINT htype, SQLHANDLE handle); public: ConnectionError(const std::string &what, SQLRETURN ret, SQLSMALLINT htype, SQLHANDLE handle) : std::runtime_error(what + ": " + process(ret, htype, handle)) { } ConnectionError(const std::string &what, MYSQL *m) : std::runtime_error(what + ": " + mysql_error(m)) { } ConnectionError(const std::string &what, MYSQL_STMT *m) : std::runtime_error(what + ": " + mysql_stmt_error(m)) { } ConnectionError(const std::string &what, const std::string &error) : std::runtime_error(what + ": " + error) { } }; enum SourceType { ST_MYSQL, ST_ODBC, ST_PYTHON }; struct ColumnInfo { std::string source_name; std::string source_type; enum enum_field_types mapped_source_type; unsigned long long source_length; std::string target_name; enum enum_field_types target_type; bool is_unsigned; bool is_long_data; }; class RowBuffer : public std::vector<MYSQL_BIND> { int _current_field; std::function<void(int, const char *, size_t)> _send_blob_data; RowBuffer(const RowBuffer &o) : std::vector<MYSQL_BIND>(), _current_field(0) { } public: RowBuffer(std::shared_ptr<std::vector<ColumnInfo> > columns, std::function<void(int, const char *, size_t)> send_blob_data, size_t max_packet_size); ~RowBuffer(); void clear(); void prepare_add_string(char *&buffer, size_t &buffer_len, unsigned long *&length); void prepare_add_float(char *&buffer, size_t &buffer_len); void prepare_add_double(char *&buffer, size_t &buffer_len); void prepare_add_bigint(char *&buffer, size_t &buffer_len); void prepare_add_long(char *&buffer, size_t &buffer_len); void prepare_add_short(char *&buffer, size_t &buffer_len); void prepare_add_tiny(char *&buffer, size_t &buffer_len); void prepare_add_time(char *&buffer, size_t &buffer_len); void prepare_add_geometry(char *&buffer, size_t &buffer_len, unsigned long *&length); void finish_field(bool was_null); enum enum_field_types target_type(bool &unsig); bool check_if_blob(); void send_blob_data(const char *data, size_t length); }; enum CopyType { CopyAll, CopyRange, CopyCount, CopyWhere }; struct CopySpec { CopyType type; std::string range_key; std::string where_expression; long long range_start; long long range_end; long long row_count; long long max_count; bool resume; }; struct TableParam { std::string source_schema; std::string source_table; std::string target_schema; std::string target_table; std::string select_expression; std::vector<std::string> source_pk_columns; std::vector<std::string> target_pk_columns; CopySpec copy_spec; }; class CopyDataSource { protected: std::string _schema_name; std::string _table_name; int _block_size; size_t _max_blob_chunk_size; std::vector<char> _blob_buffer; long long _max_parameter_size; bool _abort_on_oversized_blobs; bool _use_bulk_inserts; bool _get_field_lengths_from_target; unsigned int _connection_timeout; public: CopyDataSource(); virtual ~CopyDataSource(){}; void set_block_size(int bsize); void set_max_blob_chunk_size(size_t size); void set_max_parameter_size(unsigned long size) { _max_parameter_size = size; } void set_abort_on_oversized_blobs(bool value) { _abort_on_oversized_blobs = value; } void set_get_field_lengths_from_target(bool value) { _get_field_lengths_from_target = value; } bool get_get_field_lengths_from_target() { return _get_field_lengths_from_target; } void set_bulk_inserts(bool value) { _use_bulk_inserts = value; } std::string get_where_condition(const std::vector<std::string> &pk_columns, const std::vector<std::string> &last_pkeys); virtual size_t count_rows(const std::string &schema, const std::string &table, const std::vector<std::string> &pk_columns, const CopySpec &spec, const std::vector<std::string> &last_pkeys) = 0; virtual std::shared_ptr<std::vector<ColumnInfo> > begin_select_table( const std::string &schema, const std::string &table, const std::vector<std::string> &pk_columns, const std::string &select_expression, const CopySpec &spec, const std::vector<std::string> &last_pkeys) = 0; virtual void end_select_table() = 0; virtual bool fetch_row(RowBuffer &rowbuffer) = 0; }; class ODBCCopyDataSource : public CopyDataSource { SQLHDBC _dbc; std::string _connstring; SQLHSTMT _stmt; std::shared_ptr<std::vector<ColumnInfo> > _columns; std::vector<SQLSMALLINT> _column_types; bool _stmt_ok; int _column_count; bool _force_utf8_input; std::string _source_rdbms_type; SQLSMALLINT odbc_type_to_c_type(SQLSMALLINT type, bool is_unsigned); void ucs2_to_utf8(char *inbuf, size_t inbuf_len, char *&utf8buf, size_t &utf8buf_len); public: ODBCCopyDataSource(SQLHENV env, const std::string &connstring, const std::string &password, bool force_utf8_input, const std::string &source_rdbms_type); virtual ~ODBCCopyDataSource(); SQLRETURN get_wchar_buffer_data(RowBuffer &rowbuffer, int column); SQLRETURN get_char_buffer_data(RowBuffer &rowbuffer, int column); SQLRETURN get_date_time_data(RowBuffer &rowbuffer, int column, int type); SQLRETURN get_geometry_buffer_data(RowBuffer &rowbuffer, int column); public: virtual size_t count_rows(const std::string &schema, const std::string &table, const std::vector<std::string> &pk_columns, const CopySpec &spec, const std::vector<std::string> &last_pkeys); virtual std::shared_ptr<std::vector<ColumnInfo> > begin_select_table( const std::string &schema, const std::string &table, const std::vector<std::string> &pk_columns, const std::string &select_expression, const CopySpec &spec, const std::vector<std::string> &last_pkeys); virtual void end_select_table(); virtual bool fetch_row(RowBuffer &rowbuffer); }; class MySQLCopyDataSource : public CopyDataSource { MYSQL _mysql; MYSQL_STMT *_select_stmt; bool _has_long_data; public: MySQLCopyDataSource(const std::string &hostname, int port, const std::string &username, const std::string &password, const std::string &socket, bool use_cleartext_plugin, const unsigned int connection_timeout); virtual ~MySQLCopyDataSource(); virtual size_t count_rows(const std::string &schema, const std::string &table, const std::vector<std::string> &pk_columns, const CopySpec &spec, const std::vector<std::string> &last_pkeys); virtual std::shared_ptr<std::vector<ColumnInfo> > begin_select_table( const std::string &schema, const std::string &table, const std::vector<std::string> &pk_columns, const std::string &select_expression, const CopySpec &spec, const std::vector<std::string> &last_pkeys); virtual void end_select_table(); virtual bool fetch_row(RowBuffer &rowbuffer); }; class MySQLCopyDataTarget { struct InsertBuffer { MYSQL *_mysql; MySQLCopyDataTarget *_target; char *buffer; size_t length; size_t size; size_t last_insert_length; InsertBuffer(MySQLCopyDataTarget *target) : _target(target), buffer(NULL), length(0), size(0), last_insert_length(0) { } ~InsertBuffer() { if (buffer) free(buffer); } void reset(size_t size); void end_insert(); bool append(const char *data, size_t length); bool append(const char *data); bool append_escaped(const char *data, size_t length); void set_connection(MYSQL *mysql) { _mysql = mysql; } size_t space_left(); }; MYSQL _mysql; MYSQL_STMT *_insert_stmt; std::string _incoming_data_charset; unsigned long _max_allowed_packet; unsigned long _max_long_data_size; std::string _schema; std::string _table; std::shared_ptr<std::vector<ColumnInfo> > _columns; RowBuffer *_row_buffer; bool _truncate; int _major_version; int _minor_version; int _build_version; // Variables used for bulk inserts bool _use_bulk_inserts; bool _init_bulk_insert; bool _get_field_lengths_from_target; std::string _bulk_insert_query; InsertBuffer _bulk_insert_buffer; InsertBuffer _bulk_insert_record; int _bulk_record_count; int _bulk_insert_batch; std::string _source_rdbms_type; unsigned int _connection_timeout; MYSQL_RES *get_server_value(const std::string &variable); void get_server_value(const std::string &variable, std::string &value); void get_server_value(const std::string &variable, unsigned long &value); bool format_bulk_record(); bool append_bulk_column(size_t col_index); void get_server_version(); bool is_mysql_version_at_least(const int _major, const int _minor, const int _build); void send_long_data(int column, const char *data, size_t length); void init(); std::string ps_query(); enum enum_field_types field_type_to_ps_param_type(enum enum_field_types ftype); void get_generated_columns(const std::string &schema, const std::string &table, std::vector<std::string> &gc); public: MySQLCopyDataTarget(const std::string &hostname, int port, const std::string &username, const std::string &password, const std::string &socket, bool use_cleartext_plugin, const std::string &app_name, const std::string &incoming_charset, const std::string &source_rdbms_type, const unsigned int connection_timeout); ~MySQLCopyDataTarget(); size_t get_max_allowed_packet() { return _max_allowed_packet; } size_t get_max_long_data_size() { return _max_long_data_size; } void set_truncate(bool flag); void set_target_table(const std::string &schema, const std::string &table, std::shared_ptr<std::vector<ColumnInfo> > columns); long long get_max_value(const std::string &key); bool bulk_inserts() { return _use_bulk_inserts; } void set_bulk_insert_batch_size(int value) { _bulk_insert_batch = value; } bool get_get_field_lengths_from_target() { return _get_field_lengths_from_target; } void set_get_field_lengths_from_target(bool value) { _get_field_lengths_from_target = value; } void begin_inserts(); int end_inserts(bool flush = true); int do_insert(bool final = false); void restore_triggers(std::set<std::string> &schemas); void backup_triggers(std::set<std::string> &schemas); void backup_triggers_for_schema(const std::string &schema); void get_triggers_for_schema(const std::string &schema, std::map<std::string, std::string> &triggers); bool get_trigger_definitions_for_schema(const std::string &schema, std::map<std::string, std::string> &triggers); void drop_trigger_backups(const std::string &schema); std::vector<std::string> get_last_pkeys(const std::vector<std::string> &pk_columns, const std::string &schema, const std::string &table); RowBuffer &row_buffer(); }; class TaskQueue { private: std::vector<TableParam> _tasks; base::Mutex _task_mutex; public: TaskQueue(); void add_task(const TableParam &task); bool get_task(TableParam &task); size_t size() { return _tasks.size(); } bool empty() { return _tasks.empty(); } }; class CopyDataTask { private: std::string _name; std::unique_ptr<CopyDataSource> _source; std::unique_ptr<MySQLCopyDataTarget> _target; TaskQueue *_tasks; bool _show_progress; GThread *_thread; static gpointer thread_func(gpointer data); void copy_table(const TableParam &task); void report_progress(const std::string &schema, const std::string &table, long long current, long long total); public: CopyDataTask(const std::string name, CopyDataSource *psource, MySQLCopyDataTarget *ptarget, TaskQueue *ptasks, bool show_progress); ~CopyDataTask(); void wait() { g_thread_join(_thread); } };