plugins/migration/copytable/main.cpp (773 lines of code) (raw):
/*
* Copyright (c) 2012, 2021, Oracle and/or its affiliates. All rights reserved.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License, version 2.0,
* as published by the Free Software Foundation.
*
* This program is designed to work with certain software (including
* but not limited to OpenSSL) that is licensed under separate terms, as
* designated in a particular file or component or in included license
* documentation. The authors of MySQL hereby grant you an additional
* permission to link the program and your derivative works with the
* separately licensed software that they have either included with
* the program or referenced in the documentation.
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
* the GNU General Public License, version 2.0, for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifdef _MSC_VER
#define HAVE_ROUND
#endif
#ifdef __APPLE__
// All the functions in sql.h are deprecated, but we have no replacement atm.
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
#endif
#include "python_copy_data_source.h" // python stuff need to be 1st #include
#include "copytable.h"
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <fstream>
#include <iostream>
#include "base/log.h"
#include "base/sqlstring.h"
#undef tolower
#undef toupper
#include "base/file_utilities.h"
#include "base/string_utilities.h"
#include "workbench/wb_version.h"
#include "SSHTunnelManager.h"
void *get_mainwindow_impl() {
return 0;
}
class input_error : public std::runtime_error {
public:
input_error(const std::string &what) : std::runtime_error(what) {}
};
DEFAULT_LOG_DOMAIN("copytable");
static void count_rows(std::unique_ptr<CopyDataSource> &source,
const std::string &source_schema,
const std::string &source_table,
const std::vector<std::string> &pk_columns,
const CopySpec &spec,
const std::vector<std::string> &last_pkeys) {
unsigned long long total = source->count_rows(source_schema, source_table,
pk_columns, spec, last_pkeys);
printf("ROW_COUNT:%s:%s: %llu\n", source_schema.c_str(), source_table.c_str(),
total);
fflush(stdout);
}
//-----------------------
static bool set_log_level(const std::string &value) {
std::string level = base::tolower(value);
bool ret = base::Logger::active_level(level);
if (ret) // TODO: if the logger is set to error or warning the following log
// call won't do anything.
logInfo("Logger set to level '%s'. '%s'\n", level.c_str(),
base::Logger::get_state().c_str());
return ret;
}
static bool check_arg_with_value(char **argv, int &argi, const char *arg,
char *&value, bool arg_required) {
char *a = argv[argi];
if (strcmp(a, arg) == 0) {
// value must be in next arg
if (argv[argi + 1] != NULL) {
++argi;
value = argv[argi];
} else {
value = NULL;
if (arg_required) {
fprintf(stderr, "Missing argument for option %s\n", argv[argi]);
return false;
}
}
return true;
} else if (strncmp(a, arg, strlen(arg)) == 0 && a[strlen(arg)] == '=') {
// value must be after =
value = a + strlen(arg) + 1;
return true;
}
return false;
}
static bool parse_mysql_connstring(const std::string &connstring,
std::string &user, std::string &password,
std::string &host, int &port,
std::string &sock) {
// Format is user[:pass]@host:port or user[:pass]@::socket,
// like what cmdline utilities use.
std::string::size_type p = connstring.rfind('@');
if (p == std::string::npos)
return false;
std::string user_part = connstring.substr(0, p);
std::string server_part = connstring.substr(p + 1);
if ((p = user_part.find(':')) != std::string::npos) {
user = user_part.substr(0, p);
password = user_part.substr(p + 1);
} else
user = user_part;
p = server_part.find(':');
if (p != std::string::npos) {
host = server_part.substr(0, p);
server_part = server_part.substr(p + 1);
p = server_part.find(':');
if (p != std::string::npos)
sock = server_part.substr(p + 1);
else if (!sscanf(server_part.substr(0, p).c_str(), "%i", &port))
return false;
} else
host = server_part;
return true;
}
static void show_help() {
printf("copytable --*-source=<source db> --target=<target db> <options> "
"<table spec> [<table spec> ...]\n");
printf("--odbc-source=<odbc connstring>\n");
printf("--pythondbapi-source=<python connstring>\n");
printf("--mysql-source=<mysql connstring>\n");
printf("--source-password=<password>\n");
printf("--source-ssh-port=<ssh port>\n");
printf("--source-ssh-host=<ssh host>\n");
printf("--source-ssh-user=<ssh user>\n");
printf("--source-ssh-password=<ssh password>\n");
printf("--source-ssh-key=<path to ssh key>\n");
printf("--target=<mysql connstring>\n");
printf("--target-password=<password>\n");
printf("--target-ssh-port=<ssh port>\n");
printf("--target-ssh-host=<ssh host>\n");
printf("--target-ssh-user=<ssh user>\n");
printf("--target-ssh-password=<ssh password>\n");
printf("--target-ssh-key=<path to ssh key>\n");
printf("--ssh-known-hosts-file=<path to ssh known hosts file>\n");
printf("--ssh-config-file=<path to ssh config file>\n");
printf("--force-utf8-for-source\n");
printf("--truncate-target\n");
printf("--progress\n");
printf("--count-only\n");
printf("--jobs-from-stdin\n");
printf("--abort-on-oversized-blobs\n");
printf("--max-count=<max rows count>\n");
printf("--resume\n");
printf("Table Specification from file:\n");
printf("--table-file=<filename>\n");
printf("<source schema><TAB><source table><TAB><target schema><TAB><target "
"table><TAB><source pk columns><TAB><target pk "
"columns><TAB>*|<select expression>\n");
printf("Table Specification from command line:\n");
printf("--table <source schema> <source table> <target schema> <target "
"table> <source pk columns> <target pk columns> "
"*|<select expression>\n");
printf("--table-where <source schema> <source table> <target schema> <target "
"table> <source pk columns> <target pk "
"columns> *|<select expression> <where expression>\n");
printf("\n");
printf("--log-file=<file_path>\n");
printf("--log-level=<level>\n");
printf("--thread-count=<count>\n");
printf("--bulk-insert-batch-size=<size>\n");
printf("--disable-triggers-on=<schema>\n");
printf("--reenable-triggers-on=<schema>\n");
printf("--dont-disable-triggers");
printf("--version\n");
printf("--help\n");
}
/*
* read_tasks_from_file : reads the table information from a text file.
* Parameters:
* - file_name : the file containing the table definitions
* - count_only : indicates if the file contains information to count the records
* from the source DB or to actually trasmit the data
* - tasks : output parameter that will contain a task for each table definition
loaded
* from the file
- resume : indicates if the file contains information to resume copying data
from
last PK
* - max_count : limit copied rows count to max_count
*
* Remarks : Each table is defined in a single line with the next format for
count_only = true and resume = false
* <src_schema>\t<src_table>\n
*
* and in the next format for a count_only = false or resume = true
*
<src_schema>\t<src_table>\t<tgt_schema>\t<tgt_table>\t<source_pk_columns>\t<target_pk_columns>\t<select_expression>
*/
bool read_tasks_from_file(const std::string file_name, bool count_only,
TaskQueue &tasks,
std::set<std::string> &trigger_schemas, bool resume,
long long int max_count) {
std::ifstream ifs(file_name.data(), std::ifstream::in);
unsigned int field_count = count_only && !resume ? 2 : 7;
bool error = false;
printf("Loading table information from file %s\n", file_name.data());
while (!error && ifs.good()) {
TableParam param;
std::string line;
getline(ifs, line);
if (line.length()) {
logInfo("--table %s\n", line.data());
std::vector<std::string> fields = base::split(line, "\t", field_count);
if (fields.size() == field_count) {
param.source_schema = fields[0];
param.source_table = fields[1];
if (!(count_only && !resume)) {
param.target_schema = fields[2];
param.target_table = fields[3];
if (std::strcmp(fields[4].c_str(), "-") != 0)
param.source_pk_columns = base::split(fields[4], ",", -1);
if (std::strcmp(fields[5].c_str(), "-") != 0)
param.target_pk_columns = base::split(fields[5], ",", -1);
param.select_expression = fields[6];
trigger_schemas.insert(param.target_schema);
}
param.copy_spec.resume = resume;
param.copy_spec.max_count = max_count;
param.copy_spec.type = CopyAll;
tasks.add_task(param);
} else
error = true;
}
}
ifs.close();
return !error;
}
uint16_t createTunnel(ssh::SSHConnectionConfig &config,
const ssh::SSHConnectionCredentials &credentials,
ssh::SSHTunnelManager *manager) {
uint16_t tunnel_port = manager->lookupTunnel(config);
if (tunnel_port > 0) {
logInfo("Existing SSH tunnel found, connecting...\n");
return tunnel_port;
}
auto session = ssh::SSHSession::createSession();
std::tuple<ssh::SSHReturnType, base::any> retVal;
bool connected = false;
while (!connected) {
try {
retVal = session->connect(config, credentials);
} catch (std::exception &e) {
throw std::runtime_error(e.what());
}
std::string errorMsg = "";
switch (std::get<0>(retVal)) {
case ssh::SSHReturnType::CONNECTED: {
connected = true;
break;
}
case ssh::SSHReturnType::FINGERPRINT_CHANGED:
case ssh::SSHReturnType::FINGERPRINT_MISMATCH: {
std::string fingerprint = std::get<1>(retVal);
errorMsg = "WARNING: Server public key has changed. It means either "
"you're under attack or the administrator has changed the "
"key. New public fingerprint is: " +
fingerprint;
throw std::runtime_error(errorMsg);
break;
}
case ssh::SSHReturnType::FINGERPRINT_UNKNOWN:
case ssh::SSHReturnType::FINGERPRINT_UNKNOWN_AUTH_FILE_MISSING: {
std::string fingerprint = std::get<1>(retVal);
std::string msg = "The authenticity of host '" + config.remoteSSHhost +
"' can't be established.\n Server key fingerprint is " +
fingerprint +
"\nAre you sure you want to continue connecting? [y/N]";
std::cout << msg;
char retChar = 'N';
std::cin >> retChar;
if (toupper(retChar) == 'Y') {
config.fingerprint = fingerprint;
session->disconnect();
} else {
errorMsg = "The authenticity of host '" + config.remoteSSHhost +
"' can't be established.";
throw std::runtime_error(errorMsg);
}
break;
}
default: {
std::string errorMsg = std::get<1>(retVal);
throw std::runtime_error(errorMsg);
}
}
}
try {
retVal = manager->createTunnel(session);
} catch (ssh::SSHTunnelException &se) {
std::string errMsg =
std::string("Unable to create tunnel: ").append(se.what());
throw std::runtime_error(errMsg);
}
if (std::get<0>(retVal) != ssh::SSHReturnType::CONNECTED) {
std::string errorMsg = std::get<1>(retVal);
throw std::runtime_error(errorMsg);
}
return (uint16_t)std::get<1>(retVal);
}
int main(int argc, char **argv) {
std::string app_name = base::basename(argv[0]);
TaskQueue tables;
std::string source_password;
std::string source_connstring;
bool source_use_cleartext_plugin = false;
bool source_is_utf8 = false;
std::string source_charset;
SourceType source_type = ST_MYSQL;
std::string target_connstring;
std::string target_password;
bool target_use_cleartext_plugin = false;
std::string log_level;
std::string log_file;
bool passwords_from_stdin = false;
bool count_only = false;
bool check_types_only = false;
bool truncate_target = false;
bool show_progress = false;
bool abort_on_oversized_blobs = false;
bool disable_triggers = false;
bool reenable_triggers = false;
bool disable_triggers_on_copy = true;
bool resume = false;
int thread_count = 1;
long long bulk_insert_batch = 100;
long long max_count = 0;
std::string table_file;
std::set<std::string> trigger_schemas;
std::string source_rdbms_type = "unknown";
unsigned int target_connection_timeout = 60;
unsigned int source_connection_timeout = 60;
ssh::SSHConnectionConfig sourceConfig;
sourceConfig.localhost = "127.0.0.1";
sourceConfig.remotehost = "127.0.0.1";
sourceConfig.remoteport = 0;
sourceConfig.remoteSSHhost = "";
sourceConfig.remoteSSHport = 22;
sourceConfig.connectTimeout = 10;
sourceConfig.bufferSize = 10240;
sourceConfig.connectTimeout = 10;
sourceConfig.readWriteTimeout = 5;
sourceConfig.commandTimeout = 1;
sourceConfig.commandRetryCount = 3;
sourceConfig.compressionLevel = 0;
ssh::SSHConnectionCredentials sourceCredentials;
sourceCredentials.username = "";
sourceCredentials.password = "";
sourceCredentials.auth = ssh::SSHAuthtype::PASSWORD;
ssh::SSHConnectionConfig targetConfig;
targetConfig.localhost = "127.0.0.1";
targetConfig.remotehost = "127.0.0.1";
targetConfig.remoteport = 0;
targetConfig.remoteSSHhost = "";
targetConfig.remoteSSHport = 22;
targetConfig.connectTimeout = 10;
targetConfig.bufferSize = 10240;
targetConfig.connectTimeout = 10;
targetConfig.readWriteTimeout = 5;
targetConfig.commandTimeout = 1;
targetConfig.commandRetryCount = 3;
targetConfig.compressionLevel = 0;
ssh::SSHConnectionCredentials targetCredentials;
targetCredentials.username = "";
targetCredentials.password = "";
targetCredentials.auth = ssh::SSHAuthtype::PASSWORD;
bool log_level_set = false;
int i = 1;
while (i < argc) {
char *argval = NULL;
if (check_arg_with_value(argv, i, "--log-level", argval, true))
log_level = argval;
else if (check_arg_with_value(argv, i, "--log-file", argval, true))
log_file = argval;
else if (check_arg_with_value(argv, i, "--odbc-source", argval, true)) {
source_type = ST_ODBC;
source_connstring = base::trim(argval, "\"");
} else if (check_arg_with_value(argv, i, "--mysql-source", argval, true)) {
source_type = ST_MYSQL;
source_connstring = base::trim(argval, "\"");
} else if (check_arg_with_value(argv, i, "--pythondbapi-source", argval, true)) {
source_type = ST_PYTHON;
source_connstring = base::trim(argval, "\"");
} else if (check_arg_with_value(argv, i, "--source-password", argval, true))
source_password = argval;
else if (check_arg_with_value(argv, i, "--target-password", argval, true))
target_password = argval;
else if (strcmp(argv[i], "--force-utf8-for-source") == 0)
source_is_utf8 = true;
else if (check_arg_with_value(argv, i, "--source-charset", argval, true))
source_charset = argval;
else if (strcmp(argv[i], "--progress") == 0)
show_progress = true;
else if (strcmp(argv[i], "--truncate-target") == 0)
truncate_target = true;
else if (strcmp(argv[i], "--count-only") == 0) {
// Count only will be allowed only if one of the trigger
// operations has not been indicated first
if (!disable_triggers && !reenable_triggers)
count_only = true;
} else if (strcmp(argv[i], "--check-types-only") == 0)
check_types_only = true;
else if (strcmp(argv[i], "--passwords-from-stdin") == 0)
passwords_from_stdin = true;
else if (strcmp(argv[i], "--abort-on-oversized-blobs") == 0)
abort_on_oversized_blobs = true;
else if (strcmp(argv[i], "--dont-disable-triggers") == 0)
disable_triggers_on_copy = false;
else if (strcmp(argv[i], "--resume") == 0)
resume = true;
else if (check_arg_with_value(argv, i, "--disable-triggers-on", argval, true)) {
// disabling/enabling triggers are standalone operations and mutually
// exclusive
// so here it ensures a request for trigger enabling was not found first
if (!reenable_triggers && !count_only) {
disable_triggers = true;
trigger_schemas.insert(argval);
}
} else if (check_arg_with_value(argv, i, "--reenable-triggers-on", argval, true)) {
// disabling/enabling triggers are standalone operations and mutually
// exclusive
// so here it ensures a request for trigger enabling was not found first
if (!disable_triggers && !count_only) {
reenable_triggers = true;
trigger_schemas.insert(argval);
}
} else if (check_arg_with_value(argv, i, "--thread-count", argval, true)) {
thread_count = base::atoi<int>(argval, 0);
if (thread_count < 1)
thread_count = 1;
} else if (check_arg_with_value(argv, i, "--bulk-insert-batch-size", argval, true)) {
bulk_insert_batch = base::atoi<int>(argval, 0);
if (bulk_insert_batch < 1)
bulk_insert_batch = 100;
} else if (check_arg_with_value(argv, i, "--source-ssh-port", argval, true))
sourceConfig.remoteSSHport = base::atoi<int>(argval, 0);
else if (check_arg_with_value(argv, i, "--source-ssh-host", argval, true))
sourceConfig.remoteSSHhost = argval;
else if (check_arg_with_value(argv, i, "--source-ssh-user", argval, true))
sourceCredentials.username = argval;
else if (check_arg_with_value(argv, i, "--source-ssh-password", argval, true)) {
sourceCredentials.password = argval;
sourceCredentials.auth = ssh::SSHAuthtype::PASSWORD;
} else if (check_arg_with_value(argv, i, "--source-ssh-key", argval, true)) {
sourceCredentials.keyfile = argval;
sourceCredentials.auth = ssh::SSHAuthtype::KEYFILE;
} else if (check_arg_with_value(argv, i, "--target-ssh-port", argval, true))
targetConfig.remoteSSHport = base::atoi<int>(argval, 0);
else if (check_arg_with_value(argv, i, "--target-ssh-host", argval, true))
targetConfig.remoteSSHhost = argval;
else if (check_arg_with_value(argv, i, "--target-ssh-user", argval, true))
targetCredentials.username = argval;
else if (check_arg_with_value(argv, i, "--target-ssh-password", argval, true)) {
targetCredentials.password = argval;
targetCredentials.auth = ssh::SSHAuthtype::PASSWORD;
} else if (check_arg_with_value(argv, i, "--target-ssh-key", argval, true)) {
targetCredentials.keyfile = argval;
targetCredentials.auth = ssh::SSHAuthtype::KEYFILE;
} else if (check_arg_with_value(argv, i, "--ssh-known-hosts-file", argval, true)) {
sourceConfig.knownHostsFile = targetConfig.knownHostsFile = argval;
} else if (check_arg_with_value(argv, i, "--ssh-config-file", argval, true)) {
sourceConfig.configFile = targetConfig.configFile = argval;
} else if (strcmp(argv[i], "--version") == 0) {
const char *type = APP_EDITION_NAME;
if (strcmp(APP_EDITION_NAME, "Community") ==
(0)) // Extra parens to silence warning.
type = "CE";
printf("%s %s (%s) %i.%i.%i %s build %i\n",
base::basename(argv[0]).c_str(), type, APP_LICENSE_TYPE,
APP_MAJOR_NUMBER, APP_MINOR_NUMBER, APP_RELEASE_NUMBER,
APP_RELEASE_TYPE, APP_BUILD_NUMBER);
exit(0);
} else if (strcmp(argv[i], "--help") == 0 || strcmp(argv[i], "-h") == 0) {
show_help();
exit(0);
} else if (check_arg_with_value(argv, i, "--target", argval, true)) {
target_connstring = base::trim(argval, "\"");
} else if (check_arg_with_value(argv, i, "--table-file", argval, true))
table_file = argval;
else if (strcmp(argv[i], "--table") == 0) {
TableParam param;
if ((!count_only && i + 7 >= argc) || (count_only && i + 2 >= argc)) {
fprintf(stderr, "%s: Missing value for table copy specification\n",
argv[0]);
exit(1);
}
param.source_schema = argv[++i];
param.source_table = argv[++i];
if (!(count_only && !resume)) {
param.target_schema = argv[++i];
param.target_table = argv[++i];
if (std::strcmp(argv[++i], "-") != 0)
param.source_pk_columns = base::split(argv[i], ",", -1);
if (std::strcmp(argv[++i], "-") != 0)
param.target_pk_columns = base::split(argv[i], ",", -1);
param.select_expression = argv[++i];
trigger_schemas.insert(param.target_schema);
}
param.copy_spec.resume = resume;
param.copy_spec.max_count = max_count;
param.copy_spec.type = CopyAll;
tables.add_task(param);
} else if (strcmp(argv[i], "--table-range") == 0) {
TableParam param;
if ((!count_only && i + 10 >= argc) || (count_only && i + 5 >= argc)) {
fprintf(stderr, "%s: Missing value for table copy specification\n",
argv[0]);
exit(1);
}
param.source_schema = argv[++i];
param.source_table = argv[++i];
if (!(count_only && !resume)) {
param.target_schema = argv[++i];
param.target_table = argv[++i];
if (std::strcmp(argv[++i], "-") != 0)
param.source_pk_columns = base::split(argv[i], ",", -1);
if (std::strcmp(argv[++i], "-") != 0)
param.target_pk_columns = base::split(argv[i], ",", -1);
param.select_expression = argv[++i];
trigger_schemas.insert(param.target_schema);
}
param.copy_spec.range_key = argv[++i];
param.copy_spec.range_start = base::atoi<long long>(argv[++i], 0ll);
param.copy_spec.range_end = base::atoi<long long>(argv[++i], 0ll);
param.copy_spec.type = CopyRange;
tables.add_task(param);
} else if (strcmp(argv[i], "--table-row-count") == 0) {
TableParam param;
if ((!count_only && i + 8 >= argc) || (count_only && i + 3 >= argc)) {
fprintf(stderr, "%s: Missing value for table copy specification\n",
argv[0]);
exit(1);
}
param.source_schema = argv[++i];
param.source_table = argv[++i];
if (!(count_only && !resume)) {
param.target_schema = argv[++i];
param.target_table = argv[++i];
if (std::strcmp(argv[++i], "-") != 0)
param.source_pk_columns = base::split(argv[i], ",", -1);
if (std::strcmp(argv[++i], "-") != 0)
param.target_pk_columns = base::split(argv[i], ",", -1);
param.select_expression = argv[++i];
}
param.copy_spec.row_count = base::atoi<long long>(argv[++i], 0ll);
param.copy_spec.resume = resume;
param.copy_spec.type = CopyCount;
tables.add_task(param);
} else if (check_arg_with_value(argv, i, "--source-rdbms-type", argval,
false))
source_rdbms_type = argval;
else if (strcmp(argv[i], "--table-where") == 0) {
TableParam param;
if ((!count_only && i + 8 >= argc) || (count_only && i + 4 >= argc)) {
fprintf(stderr, "%s: Missing value for table copy specification\n", argv[0]);
exit(1);
}
param.source_schema = argv[++i];
param.source_table = argv[++i];
if (!(count_only && !resume)) {
param.target_schema = argv[++i];
param.target_table = argv[++i];
if (std::strcmp(argv[++i], "-") != 0)
param.source_pk_columns = base::split(argv[i], ",", -1);
if (std::strcmp(argv[++i], "-") != 0)
param.target_pk_columns = base::split(argv[i], ",", -1);
param.select_expression = argv[++i];
param.copy_spec.where_expression = argv[++i];
trigger_schemas.insert(param.target_schema);
} else {
param.select_expression = argv[++i];
param.copy_spec.where_expression = argv[++i];
}
param.copy_spec.type = CopyWhere;
tables.add_task(param);
} else if (check_arg_with_value(argv, i, "--max-count", argval, true)) {
max_count = base::atoi<int>(argval, 0);
} else if (strcmp(argv[i], "--source-use-cleartext") == 0)
source_use_cleartext_plugin = true;
else if (strcmp(argv[i], "--target-use-cleartext") == 0)
target_use_cleartext_plugin = true;
else if (check_arg_with_value(argv, i, "--target-timeout", argval, true))
target_connection_timeout = base::atoi<int>(argval, 0);
else if (check_arg_with_value(argv, i, "--source-timeout", argval, true))
source_connection_timeout = base::atoi<int>(argval, 0);
else {
fprintf(stderr, "%s: Invalid option %s\n", argv[0], argv[i]);
exit(1);
}
i++;
}
// Creates the log to the target file if any, if not
// uses std_error
base::Logger logger(true, log_file);
if (!log_level.empty()) {
if (!set_log_level(log_level)) {
fprintf(stderr, "%s: invalid argument '%s' for option %s\n", argv[0], log_level.data(), "--log-level");
exit(1);
} else
log_level_set = true;
}
// Set the log level from environment var WB_LOG_LEVEL if specified or set a
// default log level.
if (!log_level_set) {
const char *log_setting = getenv("WB_LOG_LEVEL");
if (log_setting == NULL)
log_setting = "info";
else
log_level_set = true;
std::string level = base::tolower(log_setting);
base::Logger::active_level(level);
}
// If needed, reads the tasks from the table definition file
if (!table_file.empty()) {
if (!read_tasks_from_file(table_file, count_only, tables, trigger_schemas, resume, max_count)) {
fprintf(stderr, "Invalid table definitions format in file: %s\n", table_file.data());
exit(1);
}
}
// Not having the source connection data is an error unless
// the standalone operations to disable or reenable triggers
// are called
if (source_connstring.empty() && !reenable_triggers && !disable_triggers) {
fprintf(stderr, "Missing source DB server\n");
exit(1);
}
if (target_connstring.empty() && !(count_only && !resume)) {
fprintf(stderr, "Missing target DB server\n");
exit(1);
}
// Table definitions will be required only if the standalone operations to
// Reenable or disable triggers are not called
if (tables.empty() && !reenable_triggers && !disable_triggers) {
logWarning("Missing table list specification\n");
exit(0);
}
std::string source_host;
std::string source_user;
int source_port = -1;
std::string source_socket;
// Source connection is parsed only when NOT executing the
// Standalone operatios on triggers
if (source_type == ST_MYSQL && !reenable_triggers && !disable_triggers) {
if (!parse_mysql_connstring(source_connstring, source_user, source_password,
source_host, source_port, source_socket)) {
fprintf(stderr, "Invalid MySQL connection string %s for source database. "
"Must be in format user[:pass]@host:port or "
"user[:pass]@::socket\n",
source_connstring.c_str());
exit(1);
} else {
sourceConfig.remotehost = source_host;
sourceConfig.remoteport = source_port;
}
}
std::string target_host;
std::string target_user;
int target_port = -1;
std::string target_socket;
if (!(count_only && !resume) &&
!parse_mysql_connstring(target_connstring, target_user, target_password,
target_host, target_port, target_socket)) {
fprintf(stderr, "Invalid MySQL connection string %s for target database. "
"Must be in format user[:pass]@host:port or "
"user[:pass]@::socket\n",
target_connstring.c_str());
exit(1);
} else {
targetConfig.remotehost = target_host;
targetConfig.remoteport = target_port;
}
if (passwords_from_stdin) {
char password[200];
if (!fgets(password, sizeof(password), stdin)) {
logError("Error reading passwords from stdin\n");
exit(1);
}
if ((count_only && !resume) || reenable_triggers || disable_triggers) {
char *ptr = strtok(password, "\t\r\n");
if (ptr) {
if (count_only)
source_password = ptr;
else
target_password = ptr;
}
} else {
char *ptr = strtok(password, "\r\n");
if (ptr) {
ptr = strchr(password, '\t');
if (ptr) {
source_password = std::string(password, ptr - password);
target_password = ptr + 1;
} else
source_password = password;
}
}
}
static SQLHENV odbc_env;
ssh::SSHTunnelManager *manager = nullptr;
if (!sourceConfig.remoteSSHhost.empty() || !targetConfig.remoteSSHhost.empty()) {
manager = new ssh::SSHTunnelManager();
manager->start();
if (!sourceConfig.remoteSSHhost.empty()) {
source_port = createTunnel(sourceConfig, sourceCredentials, manager);
source_host = "127.0.0.1";
}
if (!targetConfig.remoteSSHhost.empty()) {
target_port = createTunnel(targetConfig, targetCredentials, manager);
target_host = "127.0.0.1";
}
}
PyThreadState *state = NULL;
if (source_type == ST_PYTHON) {
Py_Initialize();
//PyEval_InitThreads();
state = PyEval_SaveThread();
}
try {
if (count_only) {
std::unique_ptr<CopyDataSource> psource;
if (source_type == ST_ODBC) {
SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &odbc_env);
SQLSetEnvAttr(odbc_env, SQL_ATTR_ODBC_VERSION, (void *)SQL_OV_ODBC3, 0);
psource.reset(new ODBCCopyDataSource(odbc_env, source_connstring,
source_password, source_is_utf8,
source_rdbms_type));
} else if (source_type == ST_MYSQL)
psource.reset(new MySQLCopyDataSource(
source_host, source_port, source_user, source_password,
source_socket, source_use_cleartext_plugin,
source_connection_timeout));
else
psource.reset(
new PythonCopyDataSource(source_connstring, source_password));
std::unique_ptr<MySQLCopyDataTarget> ptarget;
TableParam task;
while (tables.get_task(task)) {
std::vector<std::string> last_pkeys;
if (task.copy_spec.resume) {
if (!ptarget.get())
ptarget.reset(new MySQLCopyDataTarget(
target_host, target_port, target_user, target_password,
target_socket, target_use_cleartext_plugin, app_name,
source_charset, source_rdbms_type, target_connection_timeout));
last_pkeys = ptarget->get_last_pkeys(task.target_pk_columns, task.target_schema, task.target_table);
}
count_rows(psource, task.source_schema, task.source_table, task.source_pk_columns, task.copy_spec, last_pkeys);
}
} else if (reenable_triggers || disable_triggers) {
std::unique_ptr<MySQLCopyDataTarget> ptarget;
ptarget.reset(new MySQLCopyDataTarget(
target_host, target_port, target_user, target_password, target_socket,
target_use_cleartext_plugin, app_name, source_charset,
source_rdbms_type, target_connection_timeout));
if (disable_triggers)
ptarget->backup_triggers(trigger_schemas);
else
ptarget->restore_triggers(trigger_schemas);
} else {
std::vector<CopyDataTask *> threads;
std::unique_ptr<MySQLCopyDataTarget> ptarget_conn;
MySQLCopyDataTarget *ptarget = NULL;
CopyDataSource *psource = NULL;
if (disable_triggers_on_copy) {
ptarget_conn.reset(new MySQLCopyDataTarget(
target_host, target_port, target_user, target_password,
target_socket, target_use_cleartext_plugin, app_name,
source_charset, source_rdbms_type, target_connection_timeout));
ptarget_conn->backup_triggers(trigger_schemas);
}
for (int index = 0; index < thread_count; index++) {
if (source_type == ST_ODBC) {
SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &odbc_env);
SQLSetEnvAttr(odbc_env, SQL_ATTR_ODBC_VERSION, (void *)SQL_OV_ODBC3, 0);
psource = new ODBCCopyDataSource(odbc_env, source_connstring,
source_password, source_is_utf8,
source_rdbms_type);
} else if (source_type == ST_MYSQL)
psource = new MySQLCopyDataSource(
source_host, source_port, source_user, source_password,
source_socket, source_use_cleartext_plugin,
source_connection_timeout);
else
psource = new PythonCopyDataSource(source_connstring, source_password);
ptarget = new MySQLCopyDataTarget(
target_host, target_port, target_user, target_password,
target_socket, target_use_cleartext_plugin, app_name,
source_charset, source_rdbms_type, target_connection_timeout);
psource->set_max_blob_chunk_size(ptarget->get_max_allowed_packet());
psource->set_max_parameter_size((unsigned long)ptarget->get_max_long_data_size());
psource->set_abort_on_oversized_blobs(abort_on_oversized_blobs);
ptarget->set_truncate(truncate_target);
if (max_count > 0)
bulk_insert_batch = max_count;
ptarget->set_bulk_insert_batch_size((int)bulk_insert_batch);
if (check_types_only) {
// XXXX
delete psource;
} else {
threads.push_back(new CopyDataTask(base::strfmt("Task %d", index + 1),
psource, ptarget, &tables,
show_progress));
}
}
// Waits for all the threads to complete
for (size_t index = 0; index < threads.size(); index++)
threads[index]->wait();
// Finally destroys the threads and connections
for (size_t index = 0; index < threads.size(); index++)
delete threads[index];
// Finally restores the triggers
if (disable_triggers_on_copy)
ptarget_conn->restore_triggers(trigger_schemas);
}
} catch (std::exception &e) {
logError("Exception: %s\n", e.what());
if (source_type == ST_PYTHON) {
PyEval_RestoreThread(state);
Py_Finalize();
}
exit(1);
}
if (source_type == ST_PYTHON) {
PyEval_RestoreThread(state);
Py_Finalize();
}
if (manager) {
manager->setStop();
manager->pokeWakeupSocket();
if (manager->isRunning())
manager->join();
delete manager;
manager = nullptr;
}
printf("FINISHED\n");
fflush(stdout);
return 0;
}