in mysqloperator/backup_main.py [0:0]
def pick_source_instance(cluster: InnoDBCluster, backup: MySQLBackup,
profile: BackupProfile, logger: logging.Logger) -> dict:
mysql = mysqlsh.mysql
primary = None
best_secondary = None
best_secondary_applier_queue_size = None
for pod in cluster.get_pods():
if pod.deleting:
continue
try:
with shellutils.DbaWrap(shellutils.connect_dba(pod.endpoint_co, logger, max_tries=3)) as dba:
if backup.parsed_spec.incremental and profile.meb:
# If an incremental backup is requested we schedule on the
# same host as last full backup. This may be a busy primary.
sql = """WITH last_full AS (
SELECT
MAX(consistency_time_utc) AS last_full_time
FROM
mysql.backup_history
WHERE
backup_type = "FULL" AND exit_state = "SUCCESS"
)
SELECT
gm.MEMBER_HOST backup_host,
bh.server_uuid backup_uuid,
@@server_uuid this_uuid,
MEMBER_STATE,
exit_state
FROM
mysql.backup_history bh
JOIN last_full
ON bh.consistency_time_utc >= last_full.last_full_time
LEFT JOIN performance_schema.replication_group_members gm
ON gm.MEMBER_ID = bh.server_uuid
WHERE
bh.exit_state = "SUCCESS"
ORDER BY bh.consistency_time_utc DESC"""
try:
# TODO - if we are in a cronjob we may want to check
# amount of incremental backups or have a
# fallback for taking full backups
incremental_info = dba.session.run_sql(sql).fetch_one_object()
if incremental_info['backup_uuid'] == incremental_info['this_uuid']:
# By definition we are online, thus not checking MEMBER_STATE
return pod.endpoint_co
else:
continue
except mysqlsh.Error as e:
logger.warning(
f"Could not get MEB backup status from {pod}: {e}")
continue
try:
tmp = dba.get_cluster().status({"extended": 1})["defaultReplicaSet"]
cluster_status = tmp["status"]
self_uuid = dba.session.run_sql("select @@server_uuid").fetch_one()[0]
member_status = [x for x in tmp["topology"].values() if x["memberId"] == self_uuid][0]
except mysqlsh.Error as e:
logger.warning(
f"Could not get cluster status from {pod}: {e}")
continue
applier_queue_size = dba.session.run_sql(
"SELECT COUNT_TRANSACTIONS_REMOTE_IN_APPLIER_QUEUE"
" FROM performance_schema.replication_group_member_stats"
" WHERE member_id = @@server_uuid").fetch_one()[0]
except mysqlsh.Error as e:
logger.warning(f"Could not connect to {pod}: {e}")
continue
if backup.parsed_spec.incremental and profile.meb:
raise Exception(
f"No instances available to backup from in cluster {cluster.name}. Inremental backup has to be taken from the same host as the last full backup.")
logger.info(
f"Cluster status from {pod} is {cluster_status}, member_status={member_status} applier_queue_size={applier_queue_size}")
if not cluster_status.startswith("OK") or member_status["memberState"] != "ONLINE":
continue
if member_status["memberRole"] == "SECONDARY":
if not best_secondary or applier_queue_size < best_secondary_applier_queue_size:
best_secondary = pod.endpoint_co
best_secondary_applier_queue_size = applier_queue_size
if applier_queue_size == 0:
break
else:
primary = pod.endpoint_co
if best_secondary:
return best_secondary
elif primary:
return primary
raise Exception(
f"No instances available to backup from in cluster {cluster.name}")