def pick_source_instance()

in mysqloperator/backup_main.py [0:0]


def pick_source_instance(cluster: InnoDBCluster, backup: MySQLBackup,
                         profile: BackupProfile, logger: logging.Logger) -> dict:
    mysql = mysqlsh.mysql

    primary = None
    best_secondary = None
    best_secondary_applier_queue_size = None

    for pod in cluster.get_pods():
        if pod.deleting:
            continue
        try:
            with shellutils.DbaWrap(shellutils.connect_dba(pod.endpoint_co, logger, max_tries=3)) as dba:
                if backup.parsed_spec.incremental and profile.meb:
                    # If an incremental backup is requested we schedule on the
                    # same host as last full backup. This may be a busy primary.
                    sql = """WITH last_full AS (
                            SELECT
                                MAX(consistency_time_utc) AS last_full_time
                            FROM
                                mysql.backup_history
                            WHERE
                                backup_type = "FULL" AND exit_state = "SUCCESS"
                        )
                        SELECT
                            gm.MEMBER_HOST backup_host,
                            bh.server_uuid backup_uuid,
                            @@server_uuid this_uuid,
                            MEMBER_STATE,
                            exit_state
                        FROM
                            mysql.backup_history bh
                        JOIN last_full
                            ON bh.consistency_time_utc >= last_full.last_full_time
                        LEFT JOIN performance_schema.replication_group_members gm
                            ON gm.MEMBER_ID = bh.server_uuid
                        WHERE
                            bh.exit_state = "SUCCESS"
                        ORDER BY  bh.consistency_time_utc DESC"""

                    try:
                        # TODO - if we are in a cronjob we may want to check
                        #        amount of incremental backups or have a
                        #        fallback for taking full backups
                        incremental_info = dba.session.run_sql(sql).fetch_one_object()
                        if incremental_info['backup_uuid'] == incremental_info['this_uuid']:
                            # By definition we are online, thus not checking MEMBER_STATE
                            return pod.endpoint_co
                        else:
                            continue

                    except mysqlsh.Error as e:
                        logger.warning(
                            f"Could not get MEB backup status from {pod}: {e}")
                        continue

                try:
                    tmp = dba.get_cluster().status({"extended": 1})["defaultReplicaSet"]
                    cluster_status = tmp["status"]
                    self_uuid = dba.session.run_sql("select @@server_uuid").fetch_one()[0]
                    member_status = [x for x in tmp["topology"].values() if x["memberId"] == self_uuid][0]
                except mysqlsh.Error as e:
                    logger.warning(
                        f"Could not get cluster status from {pod}: {e}")
                    continue
                applier_queue_size = dba.session.run_sql(
                    "SELECT COUNT_TRANSACTIONS_REMOTE_IN_APPLIER_QUEUE"
                    " FROM performance_schema.replication_group_member_stats"
                    " WHERE member_id = @@server_uuid").fetch_one()[0]

        except mysqlsh.Error as e:
            logger.warning(f"Could not connect to {pod}: {e}")
            continue


        if backup.parsed_spec.incremental and profile.meb:
            raise Exception(
                f"No instances available to backup from in cluster {cluster.name}. Inremental backup has to be taken from the same host as the last full backup.")

        logger.info(
            f"Cluster status from {pod} is {cluster_status}, member_status={member_status} applier_queue_size={applier_queue_size}")
        if not cluster_status.startswith("OK") or member_status["memberState"] != "ONLINE":
            continue

        if member_status["memberRole"] == "SECONDARY":
            if not best_secondary or applier_queue_size < best_secondary_applier_queue_size:
                best_secondary = pod.endpoint_co
                best_secondary_applier_queue_size = applier_queue_size

                if applier_queue_size == 0:
                    break
        else:
            primary = pod.endpoint_co

    if best_secondary:
        return best_secondary
    elif primary:
        return primary

    raise Exception(
        f"No instances available to backup from in cluster {cluster.name}")