storage/ndb/tools/ndb_lib_move_data.cpp (976 lines of code) (raw):

/* Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include <ndb_global.h> #include <NdbOut.hpp> #include <NdbApi.hpp> #include <NDBT.hpp> #include <ndb_limits.h> #include <ndb_lib_move_data.hpp> #include <ndb_rand.h> #define CHK1(b) \ if (!(b)) { \ ret = -1; \ abort_on_error(); \ break; \ } #define CHK2(b, e) \ if (!(b)) { \ set_error_line(__LINE__); \ set_error_code e; \ ret = -1; \ abort_on_error(); \ break; \ } Ndb_move_data::Ndb_move_data() { m_source = 0; m_target = 0; m_sourceattr = 0; m_targetattr = 0; m_data = 0; m_error_insert = false; } Ndb_move_data::~Ndb_move_data() { delete [] m_sourceattr; delete [] m_targetattr; m_sourceattr = 0; m_targetattr = 0; release_data(); } int Ndb_move_data::init(const NdbDictionary::Table* source, const NdbDictionary::Table* target) { int ret = 0; do { CHK2(source != 0, (Error::InvalidSource, "null source table pointer")); CHK2(source->getObjectStatus() == NdbDictionary::Object::Retrieved, (Error::InvalidSource, "source table status is not Retrieved")); CHK2(target != 0, (Error::InvalidTarget, "null target table pointer")); CHK2(target->getObjectStatus() == NdbDictionary::Object::Retrieved, (Error::InvalidTarget, "target table status is not Retrieved")); m_source = source; m_target = target; } while (0); return ret; } Ndb_move_data::Attr::Attr() { column = 0; name = 0; id = -1; map_id = -1; type = TypeNone; size_in_bytes = 0; length_bytes = 0; data_size = 0; pad_char = -1; equal = false; } void Ndb_move_data::set_type(Attr& attr, const NdbDictionary::Column* c) { attr.column = c; attr.name = c->getName(); attr.size_in_bytes = c->getSizeInBytes(); switch (c->getType()) { case NdbDictionary::Column::Char: case NdbDictionary::Column::Binary: attr.type = Attr::TypeArray; attr.length_bytes = 0; attr.data_size = attr.size_in_bytes; if (c->getType() == NdbDictionary::Column::Char) attr.pad_char = 0x20; else attr.pad_char = 0x0; break; case NdbDictionary::Column::Varchar: case NdbDictionary::Column::Varbinary: attr.type = Attr::TypeArray; attr.length_bytes = 1; require(attr.size_in_bytes >= attr.length_bytes); attr.data_size = attr.size_in_bytes - attr.length_bytes; attr.pad_char = -1; break; case NdbDictionary::Column::Longvarchar: case NdbDictionary::Column::Longvarbinary: attr.type = Attr::TypeArray; attr.length_bytes = 2; require(attr.size_in_bytes >= attr.length_bytes); attr.data_size = attr.size_in_bytes - attr.length_bytes; attr.pad_char = -1; break; case NdbDictionary::Column::Text: case NdbDictionary::Column::Blob: attr.type = Attr::TypeBlob; attr.length_bytes = 0; break; default: attr.type = Attr::TypeOther; break; } } Uint32 Ndb_move_data::calc_str_len_truncated(CHARSET_INFO *cs, char *data, Uint32 maxlen) { const char *begin = (const char*) data; const char *end= (const char*) (data+maxlen); int errors = 0; // for multi-byte characters, truncate to last well-formed character before // maxlen so that string is not truncated in the middle of a multi-byte char. Uint32 numchars = cs->cset->numchars(cs, begin, end); Uint32 wf_len = cs->cset->well_formed_len(cs, begin, end, numchars, &errors); require(wf_len <= maxlen); return wf_len; } int Ndb_move_data::check_nopk(const Attr& attr1, const Attr& attr2) { int ret = 0; const NdbDictionary::Column* c1 = attr1.column; const NdbDictionary::Column* c2 = attr2.column; do { CHK2(!c1->getPrimaryKey() && !c2->getPrimaryKey(), (Error::UnsupportedConversion, "cannot convert column #%d '%s'" ": primary key attributes not allowed here", 1+attr1.id, attr1.name)); } while (0); return ret; } int Ndb_move_data::check_promotion(const Attr& attr1, const Attr& attr2) { (void)attr2; int ret = 0; const Opts& opts = m_opts; do { CHK2(opts.flags & Opts::MD_ATTRIBUTE_PROMOTION, (Error::NoPromotionFlag, "cannot convert column #%d '%s'" ": promote-attributes has not been specified", 1+attr1.id, attr1.name)); } while (0); return ret; } int Ndb_move_data::check_demotion(const Attr& attr1, const Attr& attr2) { (void)attr2; int ret = 0; const Opts& opts = m_opts; do { CHK2(opts.flags & Opts::MD_ATTRIBUTE_DEMOTION, (Error::NoDemotionFlag, "cannot convert column #%d '%s'" ": demote-attributes has not been specified", 1+attr1.id, attr1.name)); } while (0); return ret; } int Ndb_move_data::check_sizes(const Attr& attr1, const Attr& attr2) { int ret = 0; do { if (attr1.data_size < attr2.data_size) { CHK1(check_promotion(attr1, attr2) == 0); } if (attr1.data_size > attr2.data_size) { CHK1(check_demotion(attr1, attr2) == 0); } } while (0); return ret; } int Ndb_move_data::check_unsupported(const Attr& attr1, const Attr& attr2) { (void)attr2; int ret = 0; do { CHK2(false, (Error::UnsupportedConversion, "cannot convert column #%d '%s'" ": unimplemented conversion", 1+attr1.id, attr1.name)); } while (0); return ret; } int Ndb_move_data::check_tables() { int ret = 0; const Opts& opts = m_opts; do { int attrcount1 = m_source->getNoOfColumns(); int attrcount2 = m_target->getNoOfColumns(); m_sourceattr = new Attr [attrcount1]; m_targetattr = new Attr [attrcount2]; // set type info, remap columns, check missing for (int i1 = 0; i1 < attrcount1; i1++) { Attr& attr1 = m_sourceattr[i1]; attr1.id = i1; const NdbDictionary::Column* c1 = m_source->getColumn(i1); require(c1 != 0); set_type(attr1, c1); const NdbDictionary::Column* c2 = m_target->getColumn(attr1.name); if (c2 == 0) { CHK2(opts.flags & Opts::MD_EXCLUDE_MISSING_COLUMNS, (Error::NoExcludeMissingFlag, "cannot convert source to target" ": source column #%d '%s' not found in target" " and exclude-missing-columns has not been specified", 1+i1, attr1.name)); } else { int i2 = c2->getColumnNo(); require(i2 >= 0 && i2 < attrcount2); Attr& attr2 = m_targetattr[i2]; attr1.map_id = i2; require(attr2.map_id == -1); attr2.map_id = i1; } } CHK1(ret == 0); for (int i2 = 0; i2 < attrcount2; i2++) { Attr& attr2 = m_targetattr[i2]; attr2.id = i2; const NdbDictionary::Column* c2 = m_target->getColumn(i2); require(c2 != 0); set_type(attr2, c2); const NdbDictionary::Column* c1 = m_source->getColumn(attr2.name); if (c1 == 0) { CHK2(opts.flags & Opts::MD_EXCLUDE_MISSING_COLUMNS, (Error::NoExcludeMissingFlag, "cannot convert source to target" ": target column #%d '%s' not found in source" " and exclude-missing-columns has not been specified", 1+i2, attr2.name)); } else { int i1 = c1->getColumnNo(); require(i2 >= 0 && i2 < attrcount2); Attr& attr1 = m_sourceattr[i1]; require(attr1.map_id == i2); require(attr2.map_id == i1); } } CHK1(ret == 0); // check conversion of non-excluded columns for (int i1 = 0; i1 < attrcount1; i1++) { Attr& attr1 = m_sourceattr[i1]; int i2 = attr1.map_id; if (i2 == -1) // excluded continue; Attr& attr2 = m_targetattr[i2]; { /* Exclude internal implementation details when comparing */ NdbDictionary::Column a1ColCopy(*attr1.column); NdbDictionary::Column a2ColCopy(*attr2.column); /* [Non] Dynamic internal storage is irrelevant */ a1ColCopy.setDynamic(false); a2ColCopy.setDynamic(false); if ((attr1.equal = attr2.equal = a1ColCopy.equal(a2ColCopy))) { continue; } } if (attr1.type == Attr::TypeArray && attr2.type == Attr::TypeBlob) { CHK1(check_nopk(attr1, attr2) == 0); CHK1(check_promotion(attr1, attr2) == 0); continue; } if (attr1.type == Attr::TypeBlob && attr2.type == Attr::TypeArray) { CHK1(check_nopk(attr1, attr2) == 0); CHK1(check_demotion(attr1, attr2) == 0); continue; } if (attr1.type == Attr::TypeArray && attr2.type == Attr::TypeArray) { CHK1(check_sizes(attr1, attr2) == 0); continue; } if (attr1.type == Attr::TypeBlob && attr2.type == Attr::TypeBlob) { // TEXT and BLOB conversions CHK1(check_sizes(attr1, attr2) == 0); continue; } CHK1(check_unsupported(attr1, attr2) == 0); } CHK1(ret == 0); } while (0); return ret; } Ndb_move_data::Data::Data() { data = 0; next = 0; } Ndb_move_data::Data::~Data() { delete [] data; data = 0; } char* Ndb_move_data::alloc_data(Uint32 n) { Data* d = new Data; d->data = new char [n]; d->next = m_data; m_data = d; return d->data; } void Ndb_move_data::release_data() { while (m_data != 0) { Data* d = m_data; m_data = d->next; delete d; } } Ndb_move_data::Op::Op() { ndb = 0; scantrans = 0; scanop = 0; updatetrans = 0; updateop = 0; values = 0; buflen = 32 * 1024; require(buflen >= (4 * MAX_TUPLE_SIZE_IN_WORDS)); buf1 = new char [buflen]; buf2 = new char [buflen]; rows_in_batch = 0; truncated_in_batch = 0; end_of_scan = false; } Ndb_move_data::Op::~Op() { delete [] values; delete [] buf1; delete [] buf2; values = 0; buf1 = 0; buf2 = 0; } int Ndb_move_data::start_scan() { int ret = 0; Op& op = m_op; const int attrcount1 = m_source->getNoOfColumns(); do { require(op.scantrans == 0); op.scantrans = op.ndb->startTransaction(m_source); CHK2(op.scantrans != 0, (op.ndb->getNdbError())); op.scanop = op.scantrans->getNdbScanOperation(m_source); CHK2(op.scanop != 0, (op.scantrans->getNdbError())); NdbOperation::LockMode lm = NdbOperation::LM_Exclusive; Uint32 flags = 0; CHK2(op.scanop->readTuples(lm, flags) == 0, (op.scanop->getNdbError())); require(op.values == 0); op.values = new Op::Value [attrcount1]; for (int i1 = 0; i1 < attrcount1; i1++) { const Attr& attr1 = m_sourceattr[i1]; if (attr1.type != Attr::TypeBlob) { NdbRecAttr* ra = op.scanop->getValue(i1); CHK2(ra != 0, (op.scanop->getNdbError())); op.values[i1].ra = ra; } else { NdbBlob* bh = op.scanop->getBlobHandle(i1); CHK2(bh != 0, (op.scanop->getNdbError())); op.values[i1].bh = bh; } } CHK1(ret == 0); CHK2(op.scantrans->execute(NdbTransaction::NoCommit) == 0, (op.scantrans->getNdbError())); } while (0); return ret; } /* * Copy one attribute value. nextResult() re-defines ra/bh * to point to the new row. Since we are batching, the data * must be saved to remain valid until execute. */ int Ndb_move_data::copy_other_to_other(const Attr& attr1, const Attr& attr2) { int ret = 0; Op& op = m_op; const int i1 = attr1.id; const int i2 = attr2.id; do { const NdbRecAttr* ra1 = op.values[i1].ra; require(ra1 != 0 && ra1->isNULL() != -1); if (ra1->isNULL()) { const char* value = 0; CHK2(op.updateop->setValue(i2, value) == 0, (op.updateop->getNdbError())); } else { const char* value = ra1->aRef(); CHK2(op.updateop->setValue(i2, value) == 0, (op.updateop->getNdbError())); } } while (0); return ret; } int Ndb_move_data::copy_data_to_array(const char* data1, const Attr& attr2, Uint32 length1, Uint32 length1x) { int ret = 0; const Opts& opts = m_opts; Op& op = m_op; const int i2 = attr2.id; do { if (data1 == 0) { CHK2(op.updateop->setValue(i2, data1) == 0, (op.updateop->getNdbError())); } else { /* * length1 is bytes present in data1 * length1x may be longer for blob source * see invocation from array and blob source */ require(length1 <= length1x); if (length1x > attr2.data_size) { require(opts.flags & Opts::MD_ATTRIBUTE_DEMOTION); length1 = attr2.data_size; op.truncated_in_batch++; } uchar* uptr = (uchar*)&op.buf2[0]; switch (attr2.length_bytes) { case 0: break; case 1: require(length1 <= 0xFF); uptr[0] = (uchar)length1; break; case 2: require(length1 <= 0xFFFF); uptr[0] = (uchar)(length1 & 0xFF); uptr[1] = (uchar)(length1 >> 8); break; default: require(false); break; } char* data2 = &op.buf2[attr2.length_bytes]; memcpy(data2, data1, length1); if (attr2.pad_char != -1) { char* pad_data2 = &data2[length1]; Uint32 pad_length2 = attr2.data_size - length1; memset(pad_data2, attr2.pad_char, pad_length2); } CHK2(op.updateop->setValue(i2, (char*)op.buf2) == 0, (op.updateop->getNdbError())); } } while (0); return ret; } int Ndb_move_data::copy_array_to_array(const Attr& attr1, const Attr& attr2) { int ret = 0; Op& op = m_op; const int i1 = attr1.id; do { const NdbRecAttr* ra1 = op.values[i1].ra; require(ra1 != 0 && ra1->isNULL() != -1); if (ra1->isNULL()) { const char* data1 = 0; CHK1(copy_data_to_array(data1, attr2, 0, 0) == 0); } else { Uint32 size1 = ra1->get_size_in_bytes(); require(size1 >= attr1.length_bytes); Uint32 length1 = size1 - attr1.length_bytes; const char* aref1 = ra1->aRef(); const char* data1 = &aref1[attr1.length_bytes]; if (attr1.length_bytes == 0) while (length1 != 0 && data1[length1-1] == attr1.pad_char) length1--; CHK1(copy_data_to_array(data1, attr2, length1, length1) == 0); } } while (0); return ret; } int Ndb_move_data::copy_array_to_blob(const Attr& attr1, const Attr& attr2) { int ret = 0; Op& op = m_op; const int i1 = attr1.id; const int i2 = attr2.id; do { const NdbRecAttr* ra1 = op.values[i1].ra; require(ra1 != 0 && ra1->isNULL() != -1); NdbBlob* bh2 = op.updateop->getBlobHandle(i2); CHK2(bh2 != 0, (op.updateop->getNdbError())); if (ra1->isNULL()) { CHK2(bh2->setValue(0, 0) == 0, (bh2->getNdbError())); } else { Uint32 size1 = ra1->get_size_in_bytes(); require(size1 >= attr1.length_bytes); Uint32 length1 = size1 - attr1.length_bytes; const char* aref1 = ra1->aRef(); const char* data1 = &aref1[attr1.length_bytes]; if (attr1.length_bytes == 0) while (length1 != 0 && data1[length1-1] == attr1.pad_char) length1--; char* data1copy = alloc_data(length1); memcpy(data1copy, data1, length1); CHK2(bh2->setValue(data1copy, length1) == 0, (bh2->getNdbError())); } } while (0); return ret; } int Ndb_move_data::copy_blob_to_array(const Attr& attr1, const Attr& attr2) { int ret = 0; Op& op = m_op; const int i1 = attr1.id; do { NdbBlob* bh1 = op.values[i1].bh; require(bh1 != 0); int isNull = -1; CHK2(bh1->getNull(isNull) == 0, (bh1->getNdbError())); require(isNull == 0 || isNull == 1); Uint64 length64 = ~(Uint64)0; CHK2(bh1->getLength(length64) == 0, (bh1->getNdbError())); Uint32 data_length = (Uint32)length64; require((uint64)data_length == length64); if (isNull) { const char* data1 = 0; CHK1(copy_data_to_array(data1, attr2, 0, 0) == 0); } else { char* data1 = &op.buf1[0]; Uint32 length1 = attr2.data_size; require(length1 <= (unsigned)op.buflen); // avoid buffer overflow CHK2(bh1->readData(data1, length1) == 0, (bh1->getNdbError())); // pass also real length to detect truncation CHK1(copy_data_to_array(data1, attr2, length1, data_length) == 0); } } while (0); return ret; } int Ndb_move_data::copy_blob_to_blob(const Attr& attr1, const Attr& attr2) { int ret = 0; Op& op = m_op; const int i1 = attr1.id; const int i2 = attr2.id; do { NdbBlob* bh1 = op.values[i1].bh; require(bh1 != 0); int isNull = -1; CHK2(bh1->getNull(isNull) == 0, (bh1->getNdbError())); require(isNull == 0 || isNull == 1); Uint64 length64 = ~(Uint64)0; CHK2(bh1->getLength(length64) == 0, (bh1->getNdbError())); Uint32 data_length = (Uint32)length64; require((uint64)data_length == length64); NdbBlob* bh2 = op.updateop->getBlobHandle(i2); CHK2(bh2 != 0, (op.updateop->getNdbError())); if (isNull) { CHK2(bh2->setValue(0, 0) == 0, (bh2->getNdbError())); } else { char* data = alloc_data(data_length); Uint32 bytes = data_length; CHK2(bh1->readData(data, bytes) == 0, (bh1->getNdbError())); require(bytes == data_length); // prevent TINYTEXT/TINYBLOB overflow by truncating data if(attr2.column->getPartSize() == 0) { Uint32 inline_size = attr2.column->getInlineSize(); if(bytes > inline_size) { data_length = calc_str_len_truncated(attr2.column->getCharset(), data, inline_size); op.truncated_in_batch++; } } CHK2(bh2->setValue(data, data_length) == 0, (bh2->getNdbError())); } } while (0); return ret; } int Ndb_move_data::copy_attr(const Attr& attr1, const Attr& attr2) { int ret = 0; require(attr1.map_id == attr2.id); do { if (attr1.equal && attr1.type != Attr::TypeBlob) { require(attr2.equal && attr2.type != Attr::TypeBlob); CHK1(copy_other_to_other(attr1, attr2) == 0); break; } if (attr1.type == Attr::TypeArray && attr2.type == Attr::TypeArray) { CHK1(copy_array_to_array(attr1, attr2) == 0); break; } if (attr1.type == Attr::TypeArray && attr2.type == Attr::TypeBlob) { CHK1(copy_array_to_blob(attr1, attr2) == 0); break; } if (attr1.type == Attr::TypeBlob && attr2.type == Attr::TypeArray) { CHK1(copy_blob_to_array(attr1, attr2) == 0); break; } if (attr1.type == Attr::TypeBlob && attr2.type == Attr::TypeBlob) { // handles TEXT and BLOB conversions CHK1(copy_blob_to_blob(attr1, attr2) == 0); break; } require(false); } while (0); return ret; } int Ndb_move_data::move_row() { int ret = 0; Op& op = m_op; const int attrcount1 = m_source->getNoOfColumns(); do { op.updateop = op.updatetrans->getNdbOperation(m_target); CHK2(op.updateop != 0, (op.updatetrans->getNdbError())); CHK2(op.updateop->insertTuple() == 0, (op.updateop->getNdbError())); for (int j = 0; j <= 1; j++) { for (int i1 = 0; i1 < attrcount1; i1++) { const Attr& attr1 = m_sourceattr[i1]; int i2 = attr1.map_id; if (unlikely(i2 == -1)) continue; const Attr& attr2 = m_targetattr[i2]; const NdbDictionary::Column* c = attr2.column; if (j == 0 && !c->getPrimaryKey()) continue; if (j == 1 && c->getPrimaryKey()) continue; CHK1(copy_attr(attr1, attr2) == 0); } CHK1(ret == 0); } CHK1(ret == 0); CHK2(op.scanop->deleteCurrentTuple(op.updatetrans) == 0, (op.scanop->getNdbError())); } while (0); return ret; } int Ndb_move_data::move_batch() { int ret = 0; Op& op = m_op; op.rows_in_batch = 0; op.truncated_in_batch = 0; do { int res; CHK2((res = op.scanop->nextResult(true)) != -1, (op.scanop->getNdbError())); require(res == 0 || res == 1); if (res == 1) { op.end_of_scan = true; op.ndb->closeTransaction(op.scantrans); op.scantrans = 0; break; } require(op.updatetrans == 0); op.updatetrans = op.ndb->startTransaction(); CHK2(op.updatetrans != 0, (op.ndb->getNdbError())); do { CHK1(move_row() == 0); op.rows_in_batch++; CHK2((res = op.scanop->nextResult(false)) != -1, (op.scanop->getNdbError())); require(res == 0 || res == 2); } while (res == 0); CHK1(ret == 0); if (m_error_insert && ndb_rand() % 5 == 0) { invoke_error_insert(); CHK1(false); } CHK2(op.updatetrans->execute(NdbTransaction::Commit) == 0, (op.updatetrans->getNdbError())); op.ndb->closeTransaction(op.updatetrans); op.updatetrans = 0; } while (0); release_data(); return ret; } int Ndb_move_data::move_data(Ndb* ndb) { int ret = 0; Op& op = m_op; Stat& stat = m_stat; stat.rows_moved = 0; // keep rows_total do { const NDB_TICKS now = NdbTick_getCurrentTicks(); ndb_srand((unsigned)now.getUint64()); reset_error(); CHK2(m_source != 0 && m_target != 0, (Error::InvalidState, "source / target not defined")); op.ndb = ndb; CHK1(m_error.code == 0); CHK1(check_tables() == 0); CHK1(start_scan() == 0); while (1) { CHK1(move_batch() == 0); stat.rows_moved += op.rows_in_batch; stat.rows_total += op.rows_in_batch; stat.truncated += op.truncated_in_batch; require(op.end_of_scan == (op.rows_in_batch == 0)); if (op.end_of_scan) break; } CHK1(ret == 0); } while (0); close_op(ndb, ret); return ret; } void Ndb_move_data::close_op(Ndb* ndb, int ret) { Op& op = m_op; if (ret == 0) { require(op.scantrans == 0); require(op.updatetrans == 0); } else { if (op.scantrans != 0) { ndb->closeTransaction(op.scantrans); op.scantrans = 0; } if (op.updatetrans != 0) { ndb->closeTransaction(op.updatetrans); op.updatetrans = 0; } } delete [] op.values; op.values = 0; } Ndb_move_data::Opts::Opts() { flags = 0; } void Ndb_move_data::set_opts_flags(int flags) { Opts& opts = m_opts; opts.flags = flags; } void Ndb_move_data::unparse_opts_tries(char* opt, const Opts::Tries& ot) { sprintf(opt, "%d,%d,%d", ot.maxtries, ot.mindelay, ot.maxdelay); } static int parse_opts_tries_field(const char*& t, int& out) { char* u = 0; out = (int)strtol(t, &u, 10); if (t == u) return -1; // empty if (out < 0) return -1; // bad value if (*u == 0) return 0; // last one if (*u != ',') return -1; // bad char t = u + 1; return 1; // more after this one } int Ndb_move_data::parse_opts_tries(const char* s, Opts::Tries& ot) { Opts::Tries out; // a copy so nothing is set on error int ret; if (s == 0) return -1; const char* t = s; if ((ret = parse_opts_tries_field(t, out.maxtries)) > 0) if ((ret = parse_opts_tries_field(t, out.mindelay)) > 0) if ((ret = parse_opts_tries_field(t, out.maxdelay)) > 0) return -1; // too many if (ret < 0) return -1; if (out.mindelay > out.maxdelay) return -1; ot = out; return 0; } Ndb_move_data::Stat::Stat() { rows_moved = 0; rows_total = 0; truncated = 0; } const Ndb_move_data::Stat& Ndb_move_data::get_stat() { return m_stat; } Ndb_move_data::Error::Error() { line = 0; code = 0; memset(message, 0, sizeof(message)); } bool Ndb_move_data::Error::is_temporary() const { return code > 0 && ndberror.status == NdbError::TemporaryError; } const Ndb_move_data::Error& Ndb_move_data::get_error() { return m_error; } void Ndb_move_data::set_error_line(int line) { Error& e = m_error; e.line = line; } void Ndb_move_data::set_error_code(int code, const char* fmt, ...) { va_list ap; va_start(ap, fmt); require(code != 0); Error& e = m_error; e.code = code; my_vsnprintf(e.message, sizeof(e.message), fmt, ap); va_end(ap); } void Ndb_move_data::set_error_code(const NdbError& ndberror) { Error& e = m_error; set_error_code(ndberror.code, "%s", ndberror.message); e.ndberror = ndberror; } void Ndb_move_data::reset_error() { new (&m_error) Error; } NdbOut& operator<<(NdbOut& out, const Ndb_move_data::Error& error) { if (error.code < 0) out << "move data error " << error.code << ": " << error.message; else if (error.code > 0) out << "ndb error" << ": " << error.ndberror; else out << "no error"; out << " (at lib line " << error.line << ")" << endl; return out; } void Ndb_move_data::error_insert() { m_error_insert = true; } void Ndb_move_data::invoke_error_insert() { NdbError ndberror; ndberror.code = 9999; ndberror.status = NdbError::TemporaryError; ndberror.message = "Error insert"; set_error_line(0); set_error_code(ndberror); m_error_insert = false; } void Ndb_move_data::abort_on_error() { const Opts& opts = m_opts; require(!(opts.flags & Opts::MD_ABORT_ON_ERROR)); }