sql/rpl_handler.h (293 lines of code) (raw):
/* Copyright (c) 2008, 2014, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software Foundation,
51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
#ifndef RPL_HANDLER_H
#define RPL_HANDLER_H
#include "sql_priv.h"
#include "rpl_gtid.h"
#include "rpl_mi.h"
#include "rpl_rli.h"
#include "sql_plugin.h"
#include "replication.h"
#include "raft_listener_queue_if.h"
class Observer_info {
public:
void *observer;
st_plugin_int *plugin_int;
plugin_ref plugin;
Observer_info(void *ob, st_plugin_int *p)
:observer(ob), plugin_int(p)
{
plugin= plugin_int_to_ref(plugin_int);
}
};
class Delegate {
public:
typedef List<Observer_info> Observer_info_list;
typedef List_iterator<Observer_info> Observer_info_iterator;
int add_observer(void *observer, st_plugin_int *plugin)
{
int ret= FALSE;
if (!inited)
return TRUE;
write_lock();
Observer_info_iterator iter(observer_info_list);
Observer_info *info= iter++;
while (info && info->observer != observer)
info= iter++;
if (!info)
{
info= new Observer_info(observer, plugin);
if (!info || observer_info_list.push_back(info, &memroot))
ret= TRUE;
}
else
ret= TRUE;
unlock();
return ret;
}
// In some observers like raft, it might be that deinit
// will be called when observers have not been added
// In that case remove_observer will return OBSERVER_NOT_FOUND
// instead of TRUE
int remove_observer(void *observer, st_plugin_int *plugin)
{
int ret= FALSE;
if (!inited)
return TRUE;
write_lock();
Observer_info_iterator iter(observer_info_list);
Observer_info *info= iter++;
while (info && info->observer != observer)
info= iter++;
if (info)
{
iter.remove();
delete info;
}
else
ret= MYSQL_REPLICATION_OBSERVER_NOT_FOUND;
unlock();
return ret;
}
inline Observer_info_iterator observer_info_iter()
{
return Observer_info_iterator(observer_info_list);
}
inline bool is_empty()
{
DBUG_PRINT("debug", ("is_empty: %d", observer_info_list.is_empty()));
return observer_info_list.is_empty();
}
inline int read_lock()
{
if (!inited)
return TRUE;
return mysql_rwlock_rdlock(&lock);
}
inline int write_lock()
{
if (!inited)
return TRUE;
return mysql_rwlock_wrlock(&lock);
}
inline int unlock()
{
if (!inited)
return TRUE;
return mysql_rwlock_unlock(&lock);
}
inline bool is_inited()
{
return inited;
}
Delegate(
#ifdef HAVE_PSI_INTERFACE
PSI_rwlock_key key
#endif
)
{
inited= FALSE;
#ifdef HAVE_PSI_INTERFACE
if (mysql_rwlock_init(key, &lock))
return;
#else
if (mysql_rwlock_init(0, &lock))
return;
#endif
init_sql_alloc(&memroot, 1024, 0);
inited= TRUE;
}
~Delegate()
{
inited= FALSE;
mysql_rwlock_destroy(&lock);
free_root(&memroot, MYF(0));
}
private:
Observer_info_list observer_info_list;
mysql_rwlock_t lock;
MEM_ROOT memroot;
bool inited;
};
#ifdef HAVE_PSI_INTERFACE
extern PSI_rwlock_key key_rwlock_Trans_delegate_lock;
#endif
class Trans_delegate
:public Delegate {
public:
Trans_delegate()
: Delegate(
#ifdef HAVE_PSI_INTERFACE
key_rwlock_Trans_delegate_lock
#endif
)
{}
typedef Trans_observer Observer;
int before_commit(THD *thd, bool all);
int before_rollback(THD *thd, bool all);
int after_commit(THD *thd, bool all);
int after_rollback(THD *thd, bool all);
};
#ifdef HAVE_PSI_INTERFACE
extern PSI_rwlock_key key_rwlock_Binlog_storage_delegate_lock;
#endif
class Binlog_storage_delegate
:public Delegate {
public:
Binlog_storage_delegate()
: Delegate(
#ifdef HAVE_PSI_INTERFACE
key_rwlock_Binlog_storage_delegate_lock
#endif
)
{}
typedef Binlog_storage_observer Observer;
int after_flush(THD *thd, const char *log_file,
my_off_t log_pos);
int before_flush(THD *thd, IO_CACHE* io_cache);
};
#ifdef HAVE_REPLICATION
#ifdef HAVE_PSI_INTERFACE
extern PSI_rwlock_key key_rwlock_Binlog_transmit_delegate_lock;
#endif
class Binlog_transmit_delegate
:public Delegate {
public:
Binlog_transmit_delegate()
: Delegate(
#ifdef HAVE_PSI_INTERFACE
key_rwlock_Binlog_transmit_delegate_lock
#endif
)
{}
typedef Binlog_transmit_observer Observer;
int transmit_start(THD *thd, ushort flags,
const char *log_file, my_off_t log_pos,
bool *observe_transmission);
int transmit_stop(THD *thd, ushort flags);
int reserve_header(THD *thd, ushort flags, String *packet);
int before_send_event(THD *thd, ushort flags,
String *packet, const
char *log_file, my_off_t log_pos );
int after_send_event(THD *thd, ushort flags,
String *packet, const char *skipped_log_file,
my_off_t skipped_log_pos);
int after_reset_master(THD *thd, ushort flags);
};
#ifdef HAVE_PSI_INTERFACE
extern PSI_rwlock_key key_rwlock_Binlog_relay_IO_delegate_lock;
#endif
class Binlog_relay_IO_delegate
:public Delegate {
public:
Binlog_relay_IO_delegate()
: Delegate(
#ifdef HAVE_PSI_INTERFACE
key_rwlock_Binlog_relay_IO_delegate_lock
#endif
)
{}
typedef Binlog_relay_IO_observer Observer;
int thread_start(THD *thd, Master_info *mi);
int thread_stop(THD *thd, Master_info *mi);
int before_request_transmit(THD *thd, Master_info *mi, ushort flags);
int after_read_event(THD *thd, Master_info *mi,
const char *packet, ulong len,
const char **event_buf, ulong *event_len);
int after_queue_event(THD *thd, Master_info *mi,
const char *event_buf, ulong event_len,
bool synced);
int after_reset_slave(THD *thd, Master_info *mi);
private:
void init_param(Binlog_relay_IO_param *param, Master_info *mi);
};
#endif /* HAVE_REPLICATION */
#ifdef HAVE_PSI_INTERFACE
extern PSI_rwlock_key key_rwlock_Raft_replication_delegate_lock;
#endif
class Raft_replication_delegate : public Delegate {
public:
Raft_replication_delegate() : Delegate(
#ifdef HAVE_PSI_INTERFACE
key_rwlock_Raft_replication_delegate_lock
#endif
)
{}
typedef Raft_replication_observer Observer;
int before_flush(THD *thd, IO_CACHE* io_cache,
RaftReplicateMsgOpType
op_type= RaftReplicateMsgOpType::OP_TYPE_TRX);
int before_commit(THD *thd, bool all);
int setup_flush(THD *thd, Observer::st_setup_flush_arg *arg);
int before_shutdown(THD *thd);
int register_paths(THD *thd, const std::string &s_uuid, uint32_t server_id,
const std::string &wal_dir_parent,
const std::string &log_dir_parent,
const std::string &raft_log_path_prefix,
const std::string &s_hostname, uint64_t port);
int after_commit(THD *thd, bool all);
int purge_logs(THD *thd, uint64_t file_ext);
int show_raft_status(THD *thd,
std::vector<std::pair<std::string, std::string>> *var_value_pairs);
int inform_applier_health(THD *thd, bool healthy);
int inform_heartbeats_health(THD *thd, bool healthy);
};
int delegates_init();
void delegates_destroy();
extern Trans_delegate *transaction_delegate;
extern Binlog_storage_delegate *binlog_storage_delegate;
#ifdef HAVE_REPLICATION
extern Binlog_transmit_delegate *binlog_transmit_delegate;
extern Binlog_relay_IO_delegate *binlog_relay_io_delegate;
#endif /* HAVE_REPLICATION */
extern Raft_replication_delegate *raft_replication_delegate;
/*
if there is no observers in the delegate, we can return 0
immediately.
*/
#define RUN_HOOK(group, hook, args) \
(group ##_delegate->is_empty() ? \
0 : group ##_delegate->hook args)
/*
This is same as RUN_HOOK, but return 1 if there are no observers
*/
#define RUN_HOOK_STRICT(group, hook, args) \
(group ##_delegate->is_empty() ? \
1 : group ##_delegate->hook args)
#endif /* RPL_HANDLER_H */
class RaftListenerQueue : public RaftListenerQueueIf
{
public:
explicit RaftListenerQueue()
{
inited_.store(false);
}
~RaftListenerQueue();
/* Init the queue, this will create a listening thread for this queue
*
* @return 0 on success, 1 on error
*/
int init();
/* Deinit the queue. This will add an exit event into the queue which will
* be picked up by any listening thread and it will stop listening */
void deinit();
/* Add an element to the queue. This will signal any listening threads
* after adding the element to the queue
*
* @param element QueueElement to add to queue
*
* @return 0 on success, 1 on error
*/
int add(QueueElement element);
/* Get an element from the queue. This will block if there are no elements
* in the queue to be processed
*
* @return QueueElement to be processed next
*/
QueueElement get();
private:
std::mutex queue_mutex_; // Lock guarding the queue
std::condition_variable queue_cv_; // CV to wait and signal
std::queue<QueueElement> queue_; // The queue of events to be processed
std::mutex init_mutex_; // Mutex to guard against init and deinit races
std::atomic_bool inited_; // Has this been inited?
};
// A container to help passing raft related rotation info through
// the code. This will be created by the handler and then passed along.
struct RaftRotateInfo {
// The payload of config change that contains before configuration
// and after configuration. Generated with help from Raft by plugin.
std::string config_change;
// The name of the new log file after rotation. In Raft case, it
// contains the name of the leader's next log.
std::string new_log_ident;
// The starting position of the next file. Typically 4.
ulonglong pos= 0;
// Is this rotation to get consensus on a NO-OP event after winning
// election.
bool noop= false;
// if true, the rotate event has already been appended to relay log
bool post_append= false;
// If true, the server will need to replicate and get consensus on
// rotate event.
bool rotate_via_raft= false;
// goes together with config_change above, if true, this is a
// rotation to be initiated by server to get consensus on a
// config change (add/remove/modify of the ring)
bool config_change_rotate= false;
// This is the opid of the rotate event which is either
// passed in by plugin or obtained from before_flush.
// During rotation of raft logs, this is put into Metadata event
// as previous opid
std::pair<int64_t, int64_t> rotate_opid= std::make_pair(0,0);
};