in plugins/migration/copytable/main.cpp [355:954]
int main(int argc, char **argv) {
std::string app_name = base::basename(argv[0]);
TaskQueue tables;
std::string source_password;
std::string source_connstring;
bool source_use_cleartext_plugin = false;
bool source_is_utf8 = false;
std::string source_charset;
SourceType source_type = ST_MYSQL;
std::string target_connstring;
std::string target_password;
bool target_use_cleartext_plugin = false;
std::string log_level;
std::string log_file;
bool passwords_from_stdin = false;
bool count_only = false;
bool check_types_only = false;
bool truncate_target = false;
bool show_progress = false;
bool abort_on_oversized_blobs = false;
bool disable_triggers = false;
bool reenable_triggers = false;
bool disable_triggers_on_copy = true;
bool resume = false;
int thread_count = 1;
long long bulk_insert_batch = 100;
long long max_count = 0;
std::string table_file;
std::set<std::string> trigger_schemas;
std::string source_rdbms_type = "unknown";
unsigned int target_connection_timeout = 60;
unsigned int source_connection_timeout = 60;
ssh::SSHConnectionConfig sourceConfig;
sourceConfig.localhost = "127.0.0.1";
sourceConfig.remotehost = "127.0.0.1";
sourceConfig.remoteport = 0;
sourceConfig.remoteSSHhost = "";
sourceConfig.remoteSSHport = 22;
sourceConfig.connectTimeout = 10;
sourceConfig.bufferSize = 10240;
sourceConfig.connectTimeout = 10;
sourceConfig.readWriteTimeout = 5;
sourceConfig.commandTimeout = 1;
sourceConfig.commandRetryCount = 3;
sourceConfig.compressionLevel = 0;
ssh::SSHConnectionCredentials sourceCredentials;
sourceCredentials.username = "";
sourceCredentials.password = "";
sourceCredentials.auth = ssh::SSHAuthtype::PASSWORD;
ssh::SSHConnectionConfig targetConfig;
targetConfig.localhost = "127.0.0.1";
targetConfig.remotehost = "127.0.0.1";
targetConfig.remoteport = 0;
targetConfig.remoteSSHhost = "";
targetConfig.remoteSSHport = 22;
targetConfig.connectTimeout = 10;
targetConfig.bufferSize = 10240;
targetConfig.connectTimeout = 10;
targetConfig.readWriteTimeout = 5;
targetConfig.commandTimeout = 1;
targetConfig.commandRetryCount = 3;
targetConfig.compressionLevel = 0;
ssh::SSHConnectionCredentials targetCredentials;
targetCredentials.username = "";
targetCredentials.password = "";
targetCredentials.auth = ssh::SSHAuthtype::PASSWORD;
bool log_level_set = false;
int i = 1;
while (i < argc) {
char *argval = NULL;
if (check_arg_with_value(argv, i, "--log-level", argval, true))
log_level = argval;
else if (check_arg_with_value(argv, i, "--log-file", argval, true))
log_file = argval;
else if (check_arg_with_value(argv, i, "--odbc-source", argval, true)) {
source_type = ST_ODBC;
source_connstring = base::trim(argval, "\"");
} else if (check_arg_with_value(argv, i, "--mysql-source", argval, true)) {
source_type = ST_MYSQL;
source_connstring = base::trim(argval, "\"");
} else if (check_arg_with_value(argv, i, "--pythondbapi-source", argval, true)) {
source_type = ST_PYTHON;
source_connstring = base::trim(argval, "\"");
} else if (check_arg_with_value(argv, i, "--source-password", argval, true))
source_password = argval;
else if (check_arg_with_value(argv, i, "--target-password", argval, true))
target_password = argval;
else if (strcmp(argv[i], "--force-utf8-for-source") == 0)
source_is_utf8 = true;
else if (check_arg_with_value(argv, i, "--source-charset", argval, true))
source_charset = argval;
else if (strcmp(argv[i], "--progress") == 0)
show_progress = true;
else if (strcmp(argv[i], "--truncate-target") == 0)
truncate_target = true;
else if (strcmp(argv[i], "--count-only") == 0) {
// Count only will be allowed only if one of the trigger
// operations has not been indicated first
if (!disable_triggers && !reenable_triggers)
count_only = true;
} else if (strcmp(argv[i], "--check-types-only") == 0)
check_types_only = true;
else if (strcmp(argv[i], "--passwords-from-stdin") == 0)
passwords_from_stdin = true;
else if (strcmp(argv[i], "--abort-on-oversized-blobs") == 0)
abort_on_oversized_blobs = true;
else if (strcmp(argv[i], "--dont-disable-triggers") == 0)
disable_triggers_on_copy = false;
else if (strcmp(argv[i], "--resume") == 0)
resume = true;
else if (check_arg_with_value(argv, i, "--disable-triggers-on", argval, true)) {
// disabling/enabling triggers are standalone operations and mutually
// exclusive
// so here it ensures a request for trigger enabling was not found first
if (!reenable_triggers && !count_only) {
disable_triggers = true;
trigger_schemas.insert(argval);
}
} else if (check_arg_with_value(argv, i, "--reenable-triggers-on", argval, true)) {
// disabling/enabling triggers are standalone operations and mutually
// exclusive
// so here it ensures a request for trigger enabling was not found first
if (!disable_triggers && !count_only) {
reenable_triggers = true;
trigger_schemas.insert(argval);
}
} else if (check_arg_with_value(argv, i, "--thread-count", argval, true)) {
thread_count = base::atoi<int>(argval, 0);
if (thread_count < 1)
thread_count = 1;
} else if (check_arg_with_value(argv, i, "--bulk-insert-batch-size", argval, true)) {
bulk_insert_batch = base::atoi<int>(argval, 0);
if (bulk_insert_batch < 1)
bulk_insert_batch = 100;
} else if (check_arg_with_value(argv, i, "--source-ssh-port", argval, true))
sourceConfig.remoteSSHport = base::atoi<int>(argval, 0);
else if (check_arg_with_value(argv, i, "--source-ssh-host", argval, true))
sourceConfig.remoteSSHhost = argval;
else if (check_arg_with_value(argv, i, "--source-ssh-user", argval, true))
sourceCredentials.username = argval;
else if (check_arg_with_value(argv, i, "--source-ssh-password", argval, true)) {
sourceCredentials.password = argval;
sourceCredentials.auth = ssh::SSHAuthtype::PASSWORD;
} else if (check_arg_with_value(argv, i, "--source-ssh-key", argval, true)) {
sourceCredentials.keyfile = argval;
sourceCredentials.auth = ssh::SSHAuthtype::KEYFILE;
} else if (check_arg_with_value(argv, i, "--target-ssh-port", argval, true))
targetConfig.remoteSSHport = base::atoi<int>(argval, 0);
else if (check_arg_with_value(argv, i, "--target-ssh-host", argval, true))
targetConfig.remoteSSHhost = argval;
else if (check_arg_with_value(argv, i, "--target-ssh-user", argval, true))
targetCredentials.username = argval;
else if (check_arg_with_value(argv, i, "--target-ssh-password", argval, true)) {
targetCredentials.password = argval;
targetCredentials.auth = ssh::SSHAuthtype::PASSWORD;
} else if (check_arg_with_value(argv, i, "--target-ssh-key", argval, true)) {
targetCredentials.keyfile = argval;
targetCredentials.auth = ssh::SSHAuthtype::KEYFILE;
} else if (check_arg_with_value(argv, i, "--ssh-known-hosts-file", argval, true)) {
sourceConfig.knownHostsFile = targetConfig.knownHostsFile = argval;
} else if (check_arg_with_value(argv, i, "--ssh-config-file", argval, true)) {
sourceConfig.configFile = targetConfig.configFile = argval;
} else if (strcmp(argv[i], "--version") == 0) {
const char *type = APP_EDITION_NAME;
if (strcmp(APP_EDITION_NAME, "Community") ==
(0)) // Extra parens to silence warning.
type = "CE";
printf("%s %s (%s) %i.%i.%i %s build %i\n",
base::basename(argv[0]).c_str(), type, APP_LICENSE_TYPE,
APP_MAJOR_NUMBER, APP_MINOR_NUMBER, APP_RELEASE_NUMBER,
APP_RELEASE_TYPE, APP_BUILD_NUMBER);
exit(0);
} else if (strcmp(argv[i], "--help") == 0 || strcmp(argv[i], "-h") == 0) {
show_help();
exit(0);
} else if (check_arg_with_value(argv, i, "--target", argval, true)) {
target_connstring = base::trim(argval, "\"");
} else if (check_arg_with_value(argv, i, "--table-file", argval, true))
table_file = argval;
else if (strcmp(argv[i], "--table") == 0) {
TableParam param;
if ((!count_only && i + 7 >= argc) || (count_only && i + 2 >= argc)) {
fprintf(stderr, "%s: Missing value for table copy specification\n",
argv[0]);
exit(1);
}
param.source_schema = argv[++i];
param.source_table = argv[++i];
if (!(count_only && !resume)) {
param.target_schema = argv[++i];
param.target_table = argv[++i];
if (std::strcmp(argv[++i], "-") != 0)
param.source_pk_columns = base::split(argv[i], ",", -1);
if (std::strcmp(argv[++i], "-") != 0)
param.target_pk_columns = base::split(argv[i], ",", -1);
param.select_expression = argv[++i];
trigger_schemas.insert(param.target_schema);
}
param.copy_spec.resume = resume;
param.copy_spec.max_count = max_count;
param.copy_spec.type = CopyAll;
tables.add_task(param);
} else if (strcmp(argv[i], "--table-range") == 0) {
TableParam param;
if ((!count_only && i + 10 >= argc) || (count_only && i + 5 >= argc)) {
fprintf(stderr, "%s: Missing value for table copy specification\n",
argv[0]);
exit(1);
}
param.source_schema = argv[++i];
param.source_table = argv[++i];
if (!(count_only && !resume)) {
param.target_schema = argv[++i];
param.target_table = argv[++i];
if (std::strcmp(argv[++i], "-") != 0)
param.source_pk_columns = base::split(argv[i], ",", -1);
if (std::strcmp(argv[++i], "-") != 0)
param.target_pk_columns = base::split(argv[i], ",", -1);
param.select_expression = argv[++i];
trigger_schemas.insert(param.target_schema);
}
param.copy_spec.range_key = argv[++i];
param.copy_spec.range_start = base::atoi<long long>(argv[++i], 0ll);
param.copy_spec.range_end = base::atoi<long long>(argv[++i], 0ll);
param.copy_spec.type = CopyRange;
tables.add_task(param);
} else if (strcmp(argv[i], "--table-row-count") == 0) {
TableParam param;
if ((!count_only && i + 8 >= argc) || (count_only && i + 3 >= argc)) {
fprintf(stderr, "%s: Missing value for table copy specification\n",
argv[0]);
exit(1);
}
param.source_schema = argv[++i];
param.source_table = argv[++i];
if (!(count_only && !resume)) {
param.target_schema = argv[++i];
param.target_table = argv[++i];
if (std::strcmp(argv[++i], "-") != 0)
param.source_pk_columns = base::split(argv[i], ",", -1);
if (std::strcmp(argv[++i], "-") != 0)
param.target_pk_columns = base::split(argv[i], ",", -1);
param.select_expression = argv[++i];
}
param.copy_spec.row_count = base::atoi<long long>(argv[++i], 0ll);
param.copy_spec.resume = resume;
param.copy_spec.type = CopyCount;
tables.add_task(param);
} else if (check_arg_with_value(argv, i, "--source-rdbms-type", argval,
false))
source_rdbms_type = argval;
else if (strcmp(argv[i], "--table-where") == 0) {
TableParam param;
if ((!count_only && i + 8 >= argc) || (count_only && i + 4 >= argc)) {
fprintf(stderr, "%s: Missing value for table copy specification\n", argv[0]);
exit(1);
}
param.source_schema = argv[++i];
param.source_table = argv[++i];
if (!(count_only && !resume)) {
param.target_schema = argv[++i];
param.target_table = argv[++i];
if (std::strcmp(argv[++i], "-") != 0)
param.source_pk_columns = base::split(argv[i], ",", -1);
if (std::strcmp(argv[++i], "-") != 0)
param.target_pk_columns = base::split(argv[i], ",", -1);
param.select_expression = argv[++i];
param.copy_spec.where_expression = argv[++i];
trigger_schemas.insert(param.target_schema);
} else {
param.select_expression = argv[++i];
param.copy_spec.where_expression = argv[++i];
}
param.copy_spec.type = CopyWhere;
tables.add_task(param);
} else if (check_arg_with_value(argv, i, "--max-count", argval, true)) {
max_count = base::atoi<int>(argval, 0);
} else if (strcmp(argv[i], "--source-use-cleartext") == 0)
source_use_cleartext_plugin = true;
else if (strcmp(argv[i], "--target-use-cleartext") == 0)
target_use_cleartext_plugin = true;
else if (check_arg_with_value(argv, i, "--target-timeout", argval, true))
target_connection_timeout = base::atoi<int>(argval, 0);
else if (check_arg_with_value(argv, i, "--source-timeout", argval, true))
source_connection_timeout = base::atoi<int>(argval, 0);
else {
fprintf(stderr, "%s: Invalid option %s\n", argv[0], argv[i]);
exit(1);
}
i++;
}
// Creates the log to the target file if any, if not
// uses std_error
base::Logger logger(true, log_file);
if (!log_level.empty()) {
if (!set_log_level(log_level)) {
fprintf(stderr, "%s: invalid argument '%s' for option %s\n", argv[0], log_level.data(), "--log-level");
exit(1);
} else
log_level_set = true;
}
// Set the log level from environment var WB_LOG_LEVEL if specified or set a
// default log level.
if (!log_level_set) {
const char *log_setting = getenv("WB_LOG_LEVEL");
if (log_setting == NULL)
log_setting = "info";
else
log_level_set = true;
std::string level = base::tolower(log_setting);
base::Logger::active_level(level);
}
// If needed, reads the tasks from the table definition file
if (!table_file.empty()) {
if (!read_tasks_from_file(table_file, count_only, tables, trigger_schemas, resume, max_count)) {
fprintf(stderr, "Invalid table definitions format in file: %s\n", table_file.data());
exit(1);
}
}
// Not having the source connection data is an error unless
// the standalone operations to disable or reenable triggers
// are called
if (source_connstring.empty() && !reenable_triggers && !disable_triggers) {
fprintf(stderr, "Missing source DB server\n");
exit(1);
}
if (target_connstring.empty() && !(count_only && !resume)) {
fprintf(stderr, "Missing target DB server\n");
exit(1);
}
// Table definitions will be required only if the standalone operations to
// Reenable or disable triggers are not called
if (tables.empty() && !reenable_triggers && !disable_triggers) {
logWarning("Missing table list specification\n");
exit(0);
}
std::string source_host;
std::string source_user;
int source_port = -1;
std::string source_socket;
// Source connection is parsed only when NOT executing the
// Standalone operatios on triggers
if (source_type == ST_MYSQL && !reenable_triggers && !disable_triggers) {
if (!parse_mysql_connstring(source_connstring, source_user, source_password,
source_host, source_port, source_socket)) {
fprintf(stderr, "Invalid MySQL connection string %s for source database. "
"Must be in format user[:pass]@host:port or "
"user[:pass]@::socket\n",
source_connstring.c_str());
exit(1);
} else {
sourceConfig.remotehost = source_host;
sourceConfig.remoteport = source_port;
}
}
std::string target_host;
std::string target_user;
int target_port = -1;
std::string target_socket;
if (!(count_only && !resume) &&
!parse_mysql_connstring(target_connstring, target_user, target_password,
target_host, target_port, target_socket)) {
fprintf(stderr, "Invalid MySQL connection string %s for target database. "
"Must be in format user[:pass]@host:port or "
"user[:pass]@::socket\n",
target_connstring.c_str());
exit(1);
} else {
targetConfig.remotehost = target_host;
targetConfig.remoteport = target_port;
}
if (passwords_from_stdin) {
char password[200];
if (!fgets(password, sizeof(password), stdin)) {
logError("Error reading passwords from stdin\n");
exit(1);
}
if ((count_only && !resume) || reenable_triggers || disable_triggers) {
char *ptr = strtok(password, "\t\r\n");
if (ptr) {
if (count_only)
source_password = ptr;
else
target_password = ptr;
}
} else {
char *ptr = strtok(password, "\r\n");
if (ptr) {
ptr = strchr(password, '\t');
if (ptr) {
source_password = std::string(password, ptr - password);
target_password = ptr + 1;
} else
source_password = password;
}
}
}
static SQLHENV odbc_env;
ssh::SSHTunnelManager *manager = nullptr;
if (!sourceConfig.remoteSSHhost.empty() || !targetConfig.remoteSSHhost.empty()) {
manager = new ssh::SSHTunnelManager();
manager->start();
if (!sourceConfig.remoteSSHhost.empty()) {
source_port = createTunnel(sourceConfig, sourceCredentials, manager);
source_host = "127.0.0.1";
}
if (!targetConfig.remoteSSHhost.empty()) {
target_port = createTunnel(targetConfig, targetCredentials, manager);
target_host = "127.0.0.1";
}
}
PyThreadState *state = NULL;
if (source_type == ST_PYTHON) {
Py_Initialize();
//PyEval_InitThreads();
state = PyEval_SaveThread();
}
try {
if (count_only) {
std::unique_ptr<CopyDataSource> psource;
if (source_type == ST_ODBC) {
SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &odbc_env);
SQLSetEnvAttr(odbc_env, SQL_ATTR_ODBC_VERSION, (void *)SQL_OV_ODBC3, 0);
psource.reset(new ODBCCopyDataSource(odbc_env, source_connstring,
source_password, source_is_utf8,
source_rdbms_type));
} else if (source_type == ST_MYSQL)
psource.reset(new MySQLCopyDataSource(
source_host, source_port, source_user, source_password,
source_socket, source_use_cleartext_plugin,
source_connection_timeout));
else
psource.reset(
new PythonCopyDataSource(source_connstring, source_password));
std::unique_ptr<MySQLCopyDataTarget> ptarget;
TableParam task;
while (tables.get_task(task)) {
std::vector<std::string> last_pkeys;
if (task.copy_spec.resume) {
if (!ptarget.get())
ptarget.reset(new MySQLCopyDataTarget(
target_host, target_port, target_user, target_password,
target_socket, target_use_cleartext_plugin, app_name,
source_charset, source_rdbms_type, target_connection_timeout));
last_pkeys = ptarget->get_last_pkeys(task.target_pk_columns, task.target_schema, task.target_table);
}
count_rows(psource, task.source_schema, task.source_table, task.source_pk_columns, task.copy_spec, last_pkeys);
}
} else if (reenable_triggers || disable_triggers) {
std::unique_ptr<MySQLCopyDataTarget> ptarget;
ptarget.reset(new MySQLCopyDataTarget(
target_host, target_port, target_user, target_password, target_socket,
target_use_cleartext_plugin, app_name, source_charset,
source_rdbms_type, target_connection_timeout));
if (disable_triggers)
ptarget->backup_triggers(trigger_schemas);
else
ptarget->restore_triggers(trigger_schemas);
} else {
std::vector<CopyDataTask *> threads;
std::unique_ptr<MySQLCopyDataTarget> ptarget_conn;
MySQLCopyDataTarget *ptarget = NULL;
CopyDataSource *psource = NULL;
if (disable_triggers_on_copy) {
ptarget_conn.reset(new MySQLCopyDataTarget(
target_host, target_port, target_user, target_password,
target_socket, target_use_cleartext_plugin, app_name,
source_charset, source_rdbms_type, target_connection_timeout));
ptarget_conn->backup_triggers(trigger_schemas);
}
for (int index = 0; index < thread_count; index++) {
if (source_type == ST_ODBC) {
SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &odbc_env);
SQLSetEnvAttr(odbc_env, SQL_ATTR_ODBC_VERSION, (void *)SQL_OV_ODBC3, 0);
psource = new ODBCCopyDataSource(odbc_env, source_connstring,
source_password, source_is_utf8,
source_rdbms_type);
} else if (source_type == ST_MYSQL)
psource = new MySQLCopyDataSource(
source_host, source_port, source_user, source_password,
source_socket, source_use_cleartext_plugin,
source_connection_timeout);
else
psource = new PythonCopyDataSource(source_connstring, source_password);
ptarget = new MySQLCopyDataTarget(
target_host, target_port, target_user, target_password,
target_socket, target_use_cleartext_plugin, app_name,
source_charset, source_rdbms_type, target_connection_timeout);
psource->set_max_blob_chunk_size(ptarget->get_max_allowed_packet());
psource->set_max_parameter_size((unsigned long)ptarget->get_max_long_data_size());
psource->set_abort_on_oversized_blobs(abort_on_oversized_blobs);
ptarget->set_truncate(truncate_target);
if (max_count > 0)
bulk_insert_batch = max_count;
ptarget->set_bulk_insert_batch_size((int)bulk_insert_batch);
if (check_types_only) {
// XXXX
delete psource;
} else {
threads.push_back(new CopyDataTask(base::strfmt("Task %d", index + 1),
psource, ptarget, &tables,
show_progress));
}
}
// Waits for all the threads to complete
for (size_t index = 0; index < threads.size(); index++)
threads[index]->wait();
// Finally destroys the threads and connections
for (size_t index = 0; index < threads.size(); index++)
delete threads[index];
// Finally restores the triggers
if (disable_triggers_on_copy)
ptarget_conn->restore_triggers(trigger_schemas);
}
} catch (std::exception &e) {
logError("Exception: %s\n", e.what());
if (source_type == ST_PYTHON) {
PyEval_RestoreThread(state);
Py_Finalize();
}
exit(1);
}
if (source_type == ST_PYTHON) {
PyEval_RestoreThread(state);
Py_Finalize();
}
if (manager) {
manager->setStop();
manager->pokeWakeupSocket();
if (manager->isRunning())
manager->join();
delete manager;
manager = nullptr;
}
printf("FINISHED\n");
fflush(stdout);
return 0;
}