in sql/ha_ndbcluster_binlog.cc [1801:2269]
int ndbcluster_log_schema_op(THD *thd,
const char *query, int query_length,
const char *db, const char *table_name,
uint32 ndb_table_id,
uint32 ndb_table_version,
enum SCHEMA_OP_TYPE type,
const char *new_db, const char *new_table_name)
{
DBUG_ENTER("ndbcluster_log_schema_op");
Thd_ndb *thd_ndb= get_thd_ndb(thd);
if (!thd_ndb)
{
if (!(thd_ndb= Thd_ndb::seize(thd)))
{
sql_print_error("Could not allocate Thd_ndb object");
DBUG_RETURN(1);
}
thd_set_thd_ndb(thd, thd_ndb);
}
DBUG_PRINT("enter",
("query: %s db: %s table_name: %s thd_ndb->options: %d",
query, db, table_name, thd_ndb->options));
if (!ndb_schema_share || thd_ndb->options & TNO_NO_LOG_SCHEMA_OP)
{
if (thd->slave_thread)
update_slave_api_stats(thd_ndb->ndb);
DBUG_RETURN(0);
}
char tmp_buf2[FN_REFLEN];
char quoted_table1[2 + 2 * FN_REFLEN + 1];
char quoted_db1[2 + 2 * FN_REFLEN + 1];
char quoted_db2[2 + 2 * FN_REFLEN + 1];
char quoted_table2[2 + 2 * FN_REFLEN + 1];
int id_length= 0;
const char *type_str;
int also_internal= 0;
uint32 log_type= (uint32)type;
switch (type)
{
case SOT_DROP_TABLE:
/* drop database command, do not log at drop table */
if (thd->lex->sql_command == SQLCOM_DROP_DB)
DBUG_RETURN(0);
/* redo the drop table query as is may contain several tables */
query= tmp_buf2;
id_length= my_strmov_quoted_identifier (thd, (char *) quoted_table1,
table_name, 0);
quoted_table1[id_length]= '\0';
id_length= my_strmov_quoted_identifier (thd, (char *) quoted_db1,
db, 0);
quoted_db1[id_length]= '\0';
query_length= (uint) (strxmov(tmp_buf2, "drop table ", quoted_db1, ".",
quoted_table1, NullS) - tmp_buf2);
type_str= "drop table";
break;
case SOT_RENAME_TABLE_PREPARE:
type_str= "rename table prepare";
also_internal= 1;
break;
case SOT_RENAME_TABLE:
/* redo the rename table query as is may contain several tables */
query= tmp_buf2;
id_length= my_strmov_quoted_identifier (thd, (char *) quoted_db1,
db, 0);
quoted_db1[id_length]= '\0';
id_length= my_strmov_quoted_identifier (thd, (char *) quoted_table1,
table_name, 0);
quoted_table1[id_length]= '\0';
id_length= my_strmov_quoted_identifier (thd, (char *) quoted_db2,
new_db, 0);
quoted_db2[id_length]= '\0';
id_length= my_strmov_quoted_identifier (thd, (char *) quoted_table2,
new_table_name, 0);
quoted_table2[id_length]= '\0';
query_length= (uint) (strxmov(tmp_buf2, "rename table ",
quoted_db1, ".", quoted_table1, " to ",
quoted_db2, ".", quoted_table2, NullS) - tmp_buf2);
type_str= "rename table";
break;
case SOT_CREATE_TABLE:
type_str= "create table";
break;
case SOT_ALTER_TABLE_COMMIT:
type_str= "alter table";
also_internal= 1;
break;
case SOT_ONLINE_ALTER_TABLE_PREPARE:
type_str= "online alter table prepare";
also_internal= 1;
break;
case SOT_ONLINE_ALTER_TABLE_COMMIT:
type_str= "online alter table commit";
also_internal= 1;
break;
case SOT_DROP_DB:
type_str= "drop db";
break;
case SOT_CREATE_DB:
type_str= "create db";
break;
case SOT_ALTER_DB:
type_str= "alter db";
break;
case SOT_TABLESPACE:
type_str= "tablespace";
break;
case SOT_LOGFILE_GROUP:
type_str= "logfile group";
break;
case SOT_TRUNCATE_TABLE:
type_str= "truncate table";
break;
case SOT_CREATE_USER:
type_str= "create user";
break;
case SOT_DROP_USER:
type_str= "drop user";
break;
case SOT_RENAME_USER:
type_str= "rename user";
break;
case SOT_GRANT:
type_str= "grant/revoke";
break;
case SOT_REVOKE:
type_str= "revoke all";
break;
default:
abort(); /* should not happen, programming error */
}
NDB_SCHEMA_OBJECT *ndb_schema_object;
{
char key[FN_REFLEN + 1];
build_table_filename(key, sizeof(key) - 1, db, table_name, "", 0);
ndb_schema_object= ndb_get_schema_object(key, TRUE, FALSE);
ndb_schema_object->table_id= ndb_table_id;
ndb_schema_object->table_version= ndb_table_version;
}
const NdbError *ndb_error= 0;
uint32 node_id= g_ndb_cluster_connection->node_id();
Uint64 epoch= 0;
{
int i;
int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes();
/* begin protect ndb_schema_share */
pthread_mutex_lock(&ndb_schema_share_mutex);
if (ndb_schema_share == 0)
{
pthread_mutex_unlock(&ndb_schema_share_mutex);
if (ndb_schema_object)
ndb_free_schema_object(&ndb_schema_object, FALSE);
DBUG_RETURN(0);
}
pthread_mutex_lock(&ndb_schema_share->mutex);
for (i= 0; i < no_storage_nodes; i++)
{
bitmap_union(&ndb_schema_object->slock_bitmap,
&ndb_schema_share->subscriber_bitmap[i]);
}
pthread_mutex_unlock(&ndb_schema_share->mutex);
pthread_mutex_unlock(&ndb_schema_share_mutex);
/* end protect ndb_schema_share */
if (also_internal)
bitmap_set_bit(&ndb_schema_object->slock_bitmap, node_id);
else
bitmap_clear_bit(&ndb_schema_object->slock_bitmap, node_id);
DBUG_DUMP("schema_subscribers", (uchar*)&ndb_schema_object->slock,
no_bytes_in_map(&ndb_schema_object->slock_bitmap));
}
Ndb *ndb= thd_ndb->ndb;
char save_db[FN_REFLEN];
strcpy(save_db, ndb->getDatabaseName());
char tmp_buf[FN_REFLEN];
NDBDICT *dict= ndb->getDictionary();
ndb->setDatabaseName(NDB_REP_DB);
Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
const NDBTAB *ndbtab= ndbtab_g.get_table();
NdbTransaction *trans= 0;
int retries= 100;
int retry_sleep= 30; /* 30 milliseconds, transaction */
const NDBCOL *col[SCHEMA_SIZE];
unsigned sz[SCHEMA_SIZE];
if (ndbtab == 0)
{
if (strcmp(NDB_REP_DB, db) != 0 ||
strcmp(NDB_SCHEMA_TABLE, table_name))
{
ndb_error= &dict->getNdbError();
}
goto end;
}
{
uint i;
for (i= 0; i < SCHEMA_SIZE; i++)
{
col[i]= ndbtab->getColumn(i);
if (i != SCHEMA_QUERY_I)
{
sz[i]= col[i]->getLength();
DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
}
}
}
while (1)
{
const char *log_db= db;
const char *log_tab= table_name;
const char *log_subscribers= (char*)ndb_schema_object->slock;
if ((trans= ndb->startTransaction()) == 0)
goto err;
while (1)
{
NdbOperation *op= 0;
int r= 0;
r|= (op= trans->getNdbOperation(ndbtab)) == 0;
DBUG_ASSERT(r == 0);
r|= op->writeTuple();
DBUG_ASSERT(r == 0);
/* db */
ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, log_db, strlen(log_db));
r|= op->equal(SCHEMA_DB_I, tmp_buf);
DBUG_ASSERT(r == 0);
/* name */
ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, log_tab,
strlen(log_tab));
r|= op->equal(SCHEMA_NAME_I, tmp_buf);
DBUG_ASSERT(r == 0);
/* slock */
DBUG_ASSERT(sz[SCHEMA_SLOCK_I] ==
no_bytes_in_map(&ndb_schema_object->slock_bitmap));
r|= op->setValue(SCHEMA_SLOCK_I, log_subscribers);
DBUG_ASSERT(r == 0);
/* query */
{
NdbBlob *ndb_blob= op->getBlobHandle(SCHEMA_QUERY_I);
DBUG_ASSERT(ndb_blob != 0);
uint blob_len= query_length;
const char* blob_ptr= query;
r|= ndb_blob->setValue(blob_ptr, blob_len);
DBUG_ASSERT(r == 0);
}
/* node_id */
r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
DBUG_ASSERT(r == 0);
/* epoch */
r|= op->setValue(SCHEMA_EPOCH_I, epoch);
DBUG_ASSERT(r == 0);
/* id */
r|= op->setValue(SCHEMA_ID_I, ndb_table_id);
DBUG_ASSERT(r == 0);
/* version */
r|= op->setValue(SCHEMA_VERSION_I, ndb_table_version);
DBUG_ASSERT(r == 0);
/* type */
r|= op->setValue(SCHEMA_TYPE_I, log_type);
DBUG_ASSERT(r == 0);
/* any value */
Uint32 anyValue = 0;
if (! thd->slave_thread)
{
/* Schema change originating from this MySQLD, check SQL_LOG_BIN
* variable and pass 'setting' to all logging MySQLDs via AnyValue
*/
if (thd_options(thd) & OPTION_BIN_LOG) /* e.g. SQL_LOG_BIN == on */
{
DBUG_PRINT("info", ("Schema event for binlogging"));
ndbcluster_anyvalue_set_normal(anyValue);
}
else
{
DBUG_PRINT("info", ("Schema event not for binlogging"));
ndbcluster_anyvalue_set_nologging(anyValue);
}
}
else
{
/*
Slave propagating replicated schema event in ndb_schema
In case replicated serverId is composite
(server-id-bits < 31) we copy it into the
AnyValue as-is
This is for 'future', as currently Schema operations
do not have composite AnyValues.
In future it may be useful to support *not* mapping composite
AnyValues to/from Binlogged server-ids.
*/
DBUG_PRINT("info", ("Replicated schema event with original server id %d",
thd->server_id));
anyValue = thd_unmasked_server_id(thd);
}
#ifndef DBUG_OFF
/*
MySQLD will set the user-portion of AnyValue (if any) to all 1s
This tests code filtering ServerIds on the value of server-id-bits.
*/
const char* p = getenv("NDB_TEST_ANYVALUE_USERDATA");
if (p != 0 && *p != 0 && *p != '0' && *p != 'n' && *p != 'N')
{
dbug_ndbcluster_anyvalue_set_userbits(anyValue);
}
#endif
r|= op->setAnyValue(anyValue);
DBUG_ASSERT(r == 0);
break;
}
if (trans->execute(NdbTransaction::Commit, NdbOperation::DefaultAbortOption,
1 /* force send */) == 0)
{
DBUG_PRINT("info", ("logged: %s", query));
dict->forceGCPWait(1);
break;
}
err:
const NdbError *this_error= trans ?
&trans->getNdbError() : &ndb->getNdbError();
if (this_error->status == NdbError::TemporaryError && !thd->killed)
{
if (retries--)
{
if (trans)
ndb->closeTransaction(trans);
do_retry_sleep(retry_sleep);
continue; // retry
}
}
ndb_error= this_error;
break;
}
end:
if (ndb_error)
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
ndb_error->code,
ndb_error->message,
"Could not log query '%s' on other mysqld's");
if (trans)
ndb->closeTransaction(trans);
ndb->setDatabaseName(save_db);
if (opt_ndb_extra_logging > 19)
{
sql_print_information("NDB: distributed %s.%s(%u/%u) type: %s(%u) query: \'%s\' to %x%x",
db,
table_name,
ndb_table_id,
ndb_table_version,
get_schema_type_name(log_type),
log_type,
query,
ndb_schema_object->slock_bitmap.bitmap[0],
ndb_schema_object->slock_bitmap.bitmap[1]);
}
/*
Wait for other mysqld's to acknowledge the table operation
*/
if (ndb_error == 0 && !bitmap_is_clear_all(&ndb_schema_object->slock_bitmap))
{
int max_timeout= DEFAULT_SYNC_TIMEOUT;
pthread_mutex_lock(&ndb_schema_object->mutex);
while (1)
{
struct timespec abstime;
int i;
int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes();
set_timespec(abstime, 1);
int ret= pthread_cond_timedwait(&injector_cond,
&ndb_schema_object->mutex,
&abstime);
if (thd->killed)
break;
/* begin protect ndb_schema_share */
pthread_mutex_lock(&ndb_schema_share_mutex);
if (ndb_schema_share == 0)
{
pthread_mutex_unlock(&ndb_schema_share_mutex);
break;
}
MY_BITMAP servers;
bitmap_init(&servers, 0, 256, FALSE);
bitmap_clear_all(&servers);
bitmap_set_bit(&servers, node_id); // "we" are always alive
pthread_mutex_lock(&ndb_schema_share->mutex);
for (i= 0; i < no_storage_nodes; i++)
{
/* remove any unsubscribed from schema_subscribers */
MY_BITMAP *tmp= &ndb_schema_share->subscriber_bitmap[i];
bitmap_union(&servers, tmp);
}
pthread_mutex_unlock(&ndb_schema_share->mutex);
pthread_mutex_unlock(&ndb_schema_share_mutex);
/* end protect ndb_schema_share */
/* remove any unsubscribed from ndb_schema_object->slock */
bitmap_intersect(&ndb_schema_object->slock_bitmap, &servers);
bitmap_free(&servers);
if (bitmap_is_clear_all(&ndb_schema_object->slock_bitmap))
break;
if (ret)
{
max_timeout--;
if (max_timeout == 0)
{
sql_print_error("NDB %s: distributing %s timed out. Ignoring...",
type_str, ndb_schema_object->key);
DBUG_ASSERT(false);
break;
}
if (opt_ndb_extra_logging)
ndb_report_waiting(type_str, max_timeout,
"distributing", ndb_schema_object->key,
&ndb_schema_object->slock_bitmap);
}
}
pthread_mutex_unlock(&ndb_schema_object->mutex);
}
else if (ndb_error)
{
sql_print_error("NDB %s: distributing %s err: %u",
type_str, ndb_schema_object->key,
ndb_error->code);
}
else if (opt_ndb_extra_logging > 19)
{
sql_print_information("NDB %s: not waiting for distributing %s",
type_str, ndb_schema_object->key);
}
if (ndb_schema_object)
ndb_free_schema_object(&ndb_schema_object, FALSE);
if (opt_ndb_extra_logging > 19)
{
sql_print_information("NDB: distribution of %s.%s(%u/%u) type: %s(%u) query: \'%s\'"
" - complete!",
db,
table_name,
ndb_table_id,
ndb_table_version,
get_schema_type_name(log_type),
log_type,
query);
}
if (thd->slave_thread)
update_slave_api_stats(ndb);
DBUG_RETURN(0);
}