storage/ndb/tools/ndb_move_data.cpp (322 lines of code) (raw):
/* Copyright (c) 2013, 2017, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#include <ndb_global.h>
#include <ndb_opts.h>
#include <NdbOut.hpp>
#include <NdbApi.hpp>
#include <NDBT.hpp>
#include <NdbSleep.h>
#include <ndb_limits.h>
#include <ndb_lib_move_data.hpp>
static const char* opt_dbname = "TEST_DB";
static bool opt_exclude_missing_columns = false;
static bool opt_promote_attributes = false;
static bool opt_lossy_conversions = false;
static const char* opt_staging_tries = 0;
static bool opt_drop_source = false;
static bool opt_verbose = false;
static bool opt_error_insert = false;
static bool opt_abort_on_error = false;
static char g_staging_tries_default[100];
static Ndb_move_data::Opts::Tries g_opts_tries;
static char g_source[MAX_TAB_NAME_SIZE];
static char g_sourcedb[MAX_TAB_NAME_SIZE];
static char g_sourcename[MAX_TAB_NAME_SIZE];
static char g_target[MAX_TAB_NAME_SIZE];
static char g_targetdb[MAX_TAB_NAME_SIZE];
static char g_targetname[MAX_TAB_NAME_SIZE];
static Ndb_cluster_connection* g_ncc = 0;
static Ndb* g_ndb = 0;
static NdbDictionary::Dictionary* g_dic = 0;
static const NdbDictionary::Table* g_sourcetab = 0;
static const NdbDictionary::Table* g_targettab = 0;
#define CHK1(b) \
if (!(b)) { \
ret = -1; \
break; \
}
#define CHK2(b, e) \
if (!(b)) { \
g_err << "ERR: " << #b << " failed at line " << __LINE__ \
<< ": " << e << endl; \
ret = -1; \
break; \
}
inline void ndb_end_and_exit(int exitcode)
{
ndb_end(0);
exit(exitcode);
}
static NdbError
getNdbError(Ndb_cluster_connection* ncc)
{
NdbError err;
err.code = ncc->get_latest_error();
err.message = ncc->get_latest_error_msg();
return err;
}
static int
doconnect()
{
int ret = 0;
do
{
g_ncc = new Ndb_cluster_connection(opt_ndb_connectstring);
CHK2(g_ncc->connect(6, 5) == 0, getNdbError(g_ncc));
CHK2(g_ncc->wait_until_ready(30, 10) == 0, getNdbError(g_ncc));
g_ndb = new Ndb(g_ncc, opt_dbname);
CHK2(g_ndb->init() == 0, g_ndb->getNdbError());
CHK2(g_ndb->waitUntilReady(30) == 0, g_ndb->getNdbError());
g_dic = g_ndb->getDictionary();
g_info << "connected" << endl;
}
while (0);
return ret;
}
static void
dodisconnect()
{
delete g_ndb;
delete g_ncc;
g_info << "disconnected" << endl;
}
static int
gettables()
{
int ret = 0;
do
{
CHK2(g_ndb->setDatabaseName(g_sourcedb) == 0, g_ndb->getNdbError());
g_sourcetab = g_dic->getTable(g_sourcename);
CHK2(g_sourcetab != 0, g_dic->getNdbError());
CHK2(g_ndb->setDatabaseName(g_targetdb) == 0, g_ndb->getNdbError());
g_targettab = g_dic->getTable(g_targetname);
CHK2(g_targettab != 0, g_dic->getNdbError());
}
while (0);
return ret;
}
static int
domove()
{
int ret = 0;
do
{
Ndb_move_data md;
const Ndb_move_data::Stat& stat = md.get_stat();
CHK2(md.init(g_sourcetab, g_targettab) == 0, md.get_error());
int flags = 0;
if (opt_abort_on_error)
flags |= Ndb_move_data::Opts::MD_ABORT_ON_ERROR;
if (opt_exclude_missing_columns)
flags |= Ndb_move_data::Opts::MD_EXCLUDE_MISSING_COLUMNS;
if (opt_promote_attributes)
flags |= Ndb_move_data::Opts::MD_ATTRIBUTE_PROMOTION;
if (opt_lossy_conversions)
flags |= Ndb_move_data::Opts::MD_ATTRIBUTE_DEMOTION;
md.set_opts_flags(flags);
const Ndb_move_data::Opts::Tries& ot = g_opts_tries;
int tries = 0;
int delay = 0;
while (1)
{
CHK2(ot.maxtries == 0 || tries < ot.maxtries,
"too many temporary errors: " << tries);
tries++;
if (opt_error_insert)
md.error_insert();
if (md.move_data(g_ndb) != 0)
{
const Ndb_move_data::Error& error = md.get_error();
g_err
<< "move data "
<< (error.is_temporary() ? "temporary error" : "permanent error")
<< " at try " << tries << " of " << ot.maxtries
<< " at rows moved " << stat.rows_moved
<< " total " << stat.rows_total
<< ": " << error << endl;
CHK1(error.is_temporary());
if (stat.rows_moved == 0) // this try
delay *= 2;
else
delay /= 2;
if (delay < ot.mindelay)
delay = ot.mindelay;
if (delay > ot.maxdelay)
delay = ot.maxdelay;
g_info << "sleep " << delay << " ms" << endl;
NdbSleep_MilliSleep(delay); // XXX useless on last try
continue;
}
g_info << "moved all " << stat.rows_total << " rows"
<< " in " << tries << " tries" << endl;
if (opt_lossy_conversions ||
stat.truncated) // just in case
g_info << "truncated " << stat.truncated << " attribute values" << endl;
break;
}
CHK1(ret == 0);
}
while (0);
return ret;
}
static int
dodrop()
{
int ret = 0;
do
{
if (!opt_drop_source)
break;
CHK2(g_ndb->setDatabaseName(g_sourcedb) == 0, g_ndb->getNdbError());
CHK2(g_dic->dropTable(g_sourcename) == 0, g_dic->getNdbError());
}
while (0);
return ret;
}
static int
doall()
{
int ret = 0;
do
{
CHK1(doconnect() == 0);
CHK1(gettables() == 0);
CHK1(domove() == 0);
CHK1(dodrop() == 0);
}
while (0);
dodisconnect();
return ret;
}
static struct my_option
my_long_options[] =
{
NDB_STD_OPTS("ndb_move_data"),
{ "database", 'd',
"Default database of source and target tables",
(uchar**) &opt_dbname, (uchar**) &opt_dbname, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "exclude-missing-columns", NDB_OPT_NOSHORT,
"Ignore extra columns in source or target table",
(uchar **)&opt_exclude_missing_columns, (uchar **)&opt_exclude_missing_columns, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "promote-attributes", 'A',
"Allow attribute data to be converted to a larger type",
(uchar **)&opt_promote_attributes, (uchar **)&opt_promote_attributes, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "lossy-conversions", 'L',
"Allow attribute data to be truncated when converted to a smaller type",
(uchar **)&opt_lossy_conversions, (uchar **)&opt_lossy_conversions, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "staging-tries", NDB_OPT_NOSHORT,
"Specify tries on temporary errors."
" Format x[,y[,z]] where"
" x=maxtries (0=no limit) y=mindelay(ms) z=maxdelay(ms)",
(uchar**) &opt_staging_tries, (uchar**) &opt_staging_tries, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "drop-source", NDB_OPT_NOSHORT,
"Drop source table after all rows have been moved",
(uchar **)&opt_drop_source, (uchar **)&opt_drop_source, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "verbose", 'v',
"Verbose messages",
(uchar **)&opt_verbose, (uchar **)&opt_verbose, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "error-insert", NDB_OPT_NOSHORT,
"Insert random temporary errors (testing option)",
(uchar **)&opt_error_insert, (uchar **)&opt_error_insert, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "abort-on-error", NDB_OPT_NOSHORT,
"dump core on permanent error in move-data library (debug option)",
(uchar **)&opt_abort_on_error, (uchar **)&opt_abort_on_error, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ 0, 0,
0,
0, 0, 0,
GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }
};
const char*
load_default_groups[]= { "mysql_cluster", 0 };
static void
short_usage_sub(void)
{
ndb_short_usage_sub("source target ( [db.]table )");
}
static void
usage()
{
printf("%s: move rows from source table to target table\n", my_progname);
ndb_usage(short_usage_sub, load_default_groups, my_long_options);
}
static void
set_staging_tries_default()
{
Ndb_move_data::Opts::Tries ot;
Ndb_move_data::unparse_opts_tries(g_staging_tries_default, ot);
opt_staging_tries = g_staging_tries_default;
}
static int
checkopts(int argc, char** argv)
{
int ret = 0;
do
{
CHK2(Ndb_move_data::parse_opts_tries(opt_staging_tries, g_opts_tries) == 0,
"option --staging-tries has invalid value " << opt_staging_tries);
CHK2(argc == 2, "arguments are: source target");
memset(g_source, 0, MAX_TAB_NAME_SIZE);
memset(g_sourcedb, 0, MAX_TAB_NAME_SIZE);
memset(g_sourcename, 0, MAX_TAB_NAME_SIZE);
memset(g_target, 0, MAX_TAB_NAME_SIZE);
memset(g_targetdb, 0, MAX_TAB_NAME_SIZE);
memset(g_targetname, 0, MAX_TAB_NAME_SIZE);
CHK2(strlen(opt_dbname) < MAX_TAB_NAME_SIZE, "db name too long");
CHK2(strlen(argv[0]) < MAX_TAB_NAME_SIZE, "source name too long");
CHK2(strlen(argv[1]) < MAX_TAB_NAME_SIZE, "target name too long");
strcpy(g_source, argv[0]);
strcpy(g_target, argv[1]);
const char* p;
if ((p = strchr(g_source, '.')) == 0)
{
strcpy(g_sourcedb, opt_dbname);
strcpy(g_sourcename, g_source);
}
else
{
strncpy(g_sourcedb, g_source, p - g_source); // is null term
strcpy(g_sourcename, p + 1);
}
if ((p = strchr(g_target, '.')) == 0)
{
strcpy(g_targetdb, opt_dbname);
strcpy(g_targetname, g_target);
}
else
{
strncpy(g_targetdb, g_target, p - g_target); // is null term
strcpy(g_targetname, p + 1);
}
}
while (0);
return ret;
}
int
main(int argc, char** argv)
{
my_progname = "ndb_move_data";
set_staging_tries_default();
int ret;
ndb_init();
ndb_opt_set_usage_funcs(short_usage_sub, usage);
ret = handle_options(&argc, &argv, my_long_options, ndb_std_get_one_option);
if (ret != 0 || checkopts(argc, argv) != 0)
{
ndb_end_and_exit(NDBT_ProgramExit(NDBT_WRONGARGS));
}
setOutputLevel(opt_verbose ? 2 : 0);
ret = doall();
if (ret == -1)
{
ndb_end_and_exit(NDBT_ProgramExit(NDBT_FAILED));
}
ndb_end_and_exit(NDBT_ProgramExit(NDBT_OK));
}