#
# Copyright (c) 2016, 2024, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0,
# as published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms,
# as designated in a particular file or component or in included license
# documentation.  The authors of MySQL hereby grant you an additional
# permission to link the program and your derivative works with the
# separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# This program is distributed in the hope that it will be useful,  but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See
# the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
#

"""
This module contains an abstraction of a MySQL server object used
by multiple gadgets. It also contains methods and functions for common server
operations.
"""

import logging
import os
import random
import re
import socket
import threading
import time
import sys

import subprocess

import mysqlsh

from mysql_gadgets import MIN_MYSQL_VERSION, MAX_MYSQL_VERSION
from mysql_gadgets.exceptions import (GadgetCnxInfoError, GadgetCnxError,
                                      GadgetQueryError, GadgetServerError,
                                      GadgetError)
from mysql_gadgets.common.connection_parser import (parse_connection,
                                                    hostname_is_ip,
                                                    clean_IPv6,)
from mysql_gadgets.common.logger import CustomLevelLogger
from mysql_gadgets.common.tools import (get_abs_path,
                                        is_executable, run_subprocess,
                                        shell_quote)

CR_SERVER_LOST = 2013
ER_OPTION_PREVENTS_STATEMENT = 1290

_FOREIGN_KEY_SET = "SET foreign_key_checks = {0}"
_AUTOCOMMIT_SET = "SET AUTOCOMMIT = {0}"
_GTID_ERROR = ("The server {host}:{port} does not comply to the latest GTID "
               "feature support. Errors:")

logging.setLoggerClass(CustomLevelLogger)
_LOGGER = logging.getLogger(__name__)


class Secret(object):
    def __init__(self, s):
        super(Secret, self).__init__()
        self.value = s

    def __str__(self):
        return str(self.value)

    def __repr__(self):
        return repr(self.value)


class Query(object):
    def __init__(self, query_string, *params):
        super(Query, self).__init__()
        self._query_string = query_string
        self._params = params

    def extend(self, query_string, *params):
        self._query_string += " " + query_string
        self._params.extend(params)

    def query(self):
        return self._query_string

    def params(self, mask=False):
        if mask:
            return tuple(map(lambda x: "<secret>" if isinstance(x, Secret) else x, self._params))

        return tuple(map(lambda x: x.value if isinstance(x, Secret) else x, self._params))

    def __str__(self):
        if self._params:
            return "{0}, with params: {1}".format(self.query(), self.params(True))

        return self.query()

    def log(self):
        return self.__str__()


def _to_str(value, charset="utf-8"):
    """Cast value to str except when None

    :param value:      Value to be cast to str
    :type  value:      bytearray
    :param charset:    the charset to decode the value. Used only on python 3
    :type  charset:    string

    :return: value as string instance or None.
    :rtype: string or None
    """
    if sys.version_info > (3, 0):
        # for Python 3 use bytes instead of str
        return None if value is None else bytes(value).decode(charset)
    else:
        return None if value is None else str(value)


def get_mysqld_version(mysqld_path):
    """Get the version of the mysql server through the mysqld executable.

    :param mysqld_path: absolute path to the mysqld executable
    :type mysqld_path: str
    :return: tuple with with major, minor and release number and version string
    :rtype tuple((int, int, int), str)
    """
    cmd = u"{0} --version".format(shell_quote(mysqld_path))
    version_proc = run_subprocess(cmd,
                                  stdout=subprocess.PIPE,
                                  stderr=subprocess.PIPE, shell=False,
                                  universal_newlines=True)
    output, error = version_proc.communicate()
    match = re.match(r'^.*mysqld.*?\s+(Ver\s+(\d+\.\d+(?:\.\d+)*).*)', output)
    if match:
        return tuple(int(n) for n in match.group(2).split('.')), match.group(1)
    else:
        error_msg = ''
        if error:
            error_msg = " Error executing '{0}': {1}".format(cmd, error)
        raise GadgetError(
            "Unable to parse version output '{0}' from mysqld executable '{1}'"
            ".{2}".format(output, mysqld_path, error_msg))


def is_valid_mysqld(mysqld_path):
    """Check if the provided mysqld is valid.

    Check the version of the given mysqld and return True if valid and
    False otherwise.

    :param mysqld_path: Path to mysqld to check.
    :type mysqld_path: string

    :return: True if the provided mysqld is valid, or False otherwise.
    :rtype: bool
    """
    # Check if mysqld is executable and version is valid.
    if is_executable(mysqld_path):
        mysqld_ver, _ = get_mysqld_version(mysqld_path)
        if MIN_MYSQL_VERSION <= mysqld_ver < MAX_MYSQL_VERSION:
            _LOGGER.debug("Valid mysqld found with version %s: '%s'",
                          ".".join(str(num) for num in mysqld_ver),
                          mysqld_path)
            return True

    # mysqld is not valid.
    _LOGGER.debug("Invalid mysqld version (or not executable): '%s'",
                  mysqld_path)
    return False


class MySQLUtilsCursorResult(object):
    def __init__(self, result):
        self._result = result
        self.with_rows = result.has_data()
        self.column_names = result.column_names

    def fetchone(self):
        row = self._result.fetch_one()
        if row:
            return tuple(str(row[i]) if row[i] is not None else 'NULL'
                         for i in range(row.length))
        return None

    def fetchall(self):
        rows = []
        row = self.fetchone()
        while row:
            rows.append(row)
            row = self.fetchone()
        return rows

    def close(self):
        pass


def get_connection_dictionary(conn_info, ssl_dict=None):
    """Get the connection dictionary.

    Convert the given connection information into a dictionary.
    The method accepts one of the following types for the connection
    information:
        - dictionary containing connection information including:
          (user, passwd, host, port, socket)
        - connection string in the form: user:pass@host:port:socket
        - an instance of the Server class

    :param conn_info: Connection information to be converted.
    :type  conn_info: dictionary or Server object or string
    :param ssl_dict:  A dictionary with the ssl options.
    :type  ssl_dict:  dictionary

    :return dictionary with connection information (user, passwd, host, port,
            socket).
    :rtype: dictionary
    """
    if conn_info is None:
        return conn_info

    if isinstance(conn_info, dict):
        # Not update conn_info if already has any ssl certificate.
        if (ssl_dict is not None and
                not (conn_info.get("ssl_ca", None) or
                     conn_info.get("ssl_cert", None) or
                     conn_info.get("ssl_key", None) or
                     conn_info.get("ssl", None))):
            conn_info.update(ssl_dict)
        conn_val = conn_info
    elif isinstance(conn_info, Server):
        # get server's dictionary
        conn_val = conn_info.get_connection_values()
    elif isinstance(conn_info, str):
        # parse the string
        conn_val = parse_connection(conn_info, options=ssl_dict)
    else:
        raise GadgetCnxInfoError("Cannot determine connection information"
                                 " type.")

    return conn_val


def check_hostname_alias(server1_cnx_values, server2_cnx_values):
    """Check to see if the servers are the same machine by host name.

    :param server1_cnx_values:   connection values for server 1
    :type  server1_cnx_values:   dictionary
    :param server2_cnx_values:   connection values for server 2
    :type  server2_cnx_values:   dictionary

    :return: True if both servers are the same otherwise False.
    :rtype: boolean
    """
    server1 = Server({'conn_info': server1_cnx_values})
    server2 = Server({'conn_info': server2_cnx_values})

    return (server1.is_alias(server2.host) and
            int(server1.port) == int(server2.port))


def get_server(server_info, ssl_dict=None, connect=True):
    """Get a server instance from server connection information | string or
    a Server instance.

    The method accepts one of the following types for server_info:

        - dictionary containing connection information including:
          (user, passwd, host, port, socket)
        - connection string in the form: user:pass@host:port:socket or
                                         login-path:port:socket
        - an instance of the Server class

    :param server_info:  Connection information
    :type  server_info:  dict | Server | str
    :param ssl_dict:     A dictionary with the ssl certificates
    :type  ssl_dict:     dict
    :param connect:      Attempt to connect to the server
    :type  connect:      boolean

    :raise GadgetCnxInfoError:  if the connection information on server_info
                                could not be parsed
    :raise GadgetServerError:   if a connection fails.

    @returns a Server instance
    :rtype: Server
    """
    if server_info is None:
        return server_info

    if isinstance(server_info, dict) and 'host' in server_info:
        # Don't update server_info if already has any ssl certificate.
        if (ssl_dict is not None and
                not (server_info.get("ssl_ca", None) or
                     server_info.get("ssl_cert", None) or
                     server_info.get("ssl_key", None) or
                     server_info.get("ssl", None))):
            server_info.update(ssl_dict)

        options = {"conn_info": server_info}
        server = Server(options)

    elif isinstance(server_info, Server):
        server = server_info

    elif isinstance(server_info, str):
        # parse the string
        conn_val = parse_connection(server_info, options=ssl_dict)
        options = {"conn_info": conn_val}
        server = Server(options)

    else:
        raise GadgetCnxInfoError("Cannot determine connection information"
                                 " type.")

    if connect:
        server.connect()

    return server


def generate_server_id(strategy='RANDOM'):
    """Generate a server ID based on the provided strategy.

    Two strategies are supported to generate a server_id: TIME and RANDOM.

    The TIME strategy allow to generate a server_id based on the current
    timestamp, ensuring that a unique server_id can be generated every second
    during approximately 31 years (until 2038, epoch end for the time.time()
    function in Unix). If different server_ids are generated during the same
    second they will all have the same value. Minimum value generated is 1
    and maximum 999999999.

    The RANDOM strategy allow to generate a pseudo-random server_id, with a
    value between 1 and 4294967295. Two generated values have a low
    probability of being the same (independently of the time they are
    generated). Minimum value generated is 1 and maximum 4294967295.

    For the random generation (assuming random generation is uniformly) the
    probability of the same value being generated is given by:
        P(n, t) = 1 - t!/(t^n * (t-n)!)
    where t is the total number of different values that can be generated, and
    n is the number of values that are generated.

    In this case, t = 4294967295 (max number of values that can be generated),
    and for example the probability of generating the same id for 15, 100, and
    1000 servers (n=15, n=100, and n=1000) is approximately:
        P(15, 4294967295)   = 2.44 * 10^-8    (0.00000244 %)
        P(100, 4294967295)  = 1.15 * 10^-6    (0.000115 %)
        P(1000, 4294967295) = 1.16 * 10^-4    (0.0116 %)

    Note: Zero is not a valid sever_id.

    :param strategy:  Strategy used to generate the server id value. Supported
                      values: 'RANDOM' to generate a random ID and 'TIME' to
                      generate the ID based on the current timestamp.
                      By default: 'RANDOM'.
    :type strategy:   String

    :return: The generated server_id.
    :rtype: string

    :raises GadgetError: If an unsupported strategy is specified.
    """
    strategy = strategy.upper()
    if strategy == 'RANDOM':
        # Generate random int between the min valid and max allowed value.
        # Note: 0 is not a valid server_id.
        return str(random.randint(1, 4294967295))
    elif strategy == 'TIME':
        # Only the nine last digits from the time can be used to avoid
        # generating a value higher than the max allowed server_id value.
        server_id = str(int(time.time()))[-9:]
        # 0 is not a valid server_id, in that case return 1 (1 second later).
        if int(server_id) == 0:
            return '1'
        return server_id
    else:
        raise GadgetError("Invalid strategy used to generate the server_id "
                          "(supported values: 'RANDOM' or 'TIME'): "
                          "{0}.".format(strategy))


# pylint: disable=too-many-public-methods
class Server(object):
    """The Server class can be used to connect to a running MySQL server.
    It provides several features, such as:
        - connect/disconnect to the server
        - get the server version
        - Retrieve a server variable
        - Execute a query, commit, and rollback
        - Return list of all databases
        - Read SQL statements from a file and execute
        - check for specific plugin support
        - etc.
    """

    def __init__(self, options):
        """Constructor

        The constructor accepts one of the following types for the
        connection information provided trough the options parameter,
        more specifically for the options['conn_info']:
            - dictionary containing connection information including:
              (user, passwd, host, port, socket)
            - connection string in the form: user:pass@host:port:socket
            - an instance of the Server class

        :param options:    Options and definitions used to create the Server
                           object. Supported key values:
            conn_info      a dictionary containing connection information
                           (user, passwd, host, port, socket)
            role           Name or role of server (e.g., server, master)
            verbose        print extra data during operations (optional)
                           default value = False
            charset        Default character set for the connection.
                           (default None)
        :type  options:    dictionary
        """
        if options is None:
            options = {}

        if options.get("conn_info") is None:
            raise GadgetCnxInfoError(
                "Server connection information missing. The option parameter"
                "must contain a 'conn_info' entry.")

        self.verbose = options.get("verbose", False)
        self.db_conn = None
        self.host = None
        self.role = options.get("role", "Server")
        self.has_ssl = False
        conn_values = get_connection_dictionary(options.get("conn_info"))
        try:
            self.host = conn_values["host"]
            self.user = conn_values["user"]
            self.passwd = conn_values["passwd"] \
                if "passwd" in conn_values else None
            self.socket = conn_values["unix_socket"] \
                if "unix_socket" in conn_values else None
            self.port = 3306
            if conn_values["port"] is not None:
                self.port = int(conn_values["port"])
            self.charset = options.get("charset",
                                       conn_values.get("charset", None))
            # Optional values
            self.ssl_ca = conn_values.get('ssl_ca', None)
            self.ssl_cert = conn_values.get('ssl_cert', None)
            self.ssl_key = conn_values.get('ssl_key', None)
            self.ssl = conn_values.get('ssl', False)
            if self.ssl_cert or self.ssl_ca or self.ssl_key or self.ssl:
                self.has_ssl = True
        except KeyError as err:
            raise GadgetCnxInfoError(
                "Server connection dictionary format not recognized. "
                "Mandatory value missing ({0}): "
                "{1}".format(str(err), conn_values))
        self.connect_error = None
        # Set to TRUE when foreign key checks are ON. Check with
        # foreign_key_checks_enabled.
        self.fkeys = None
        self.autocommit = True  # Set autocommit to True by default.
        self.read_only = False
        self.aliases = set()
        self.grants_enabled = None
        self._version = None
        self._version_full = None

    @classmethod
    def from_server(cls, server, conn_info=None):
        """ Create a new server instance from an existing one.

        Factory method that will allow the creation of a new server instance
        from an existing server.

        :param server:    Source object used to create the new server. It must
                          be an instance of the Server class or a subclass.
        :type  server:    Server object
        :param conn_info: Connection information for the new server. If
                          provided this information will overwrite the one
                          from the source server.
        :type  conn_info: dictionary

        :return: An new instance of the Server class based on the provided
                 source server snf (optional) connection information..
        :rtype: Server object
        """

        if isinstance(server, Server):
            options = {"role": server.role,
                       "verbose": server.verbose,
                       "charset": server.charset}
            if conn_info is not None and isinstance(conn_info, dict):
                options["conn_info"] = conn_info
            else:
                options["conn_info"] = server.get_connection_values()

            return cls(options)
        else:
            raise TypeError("The server argument's type is neither Server nor "
                            "a subclass of Server")

    def is_alive(self):
        """Determine if connection to server is alive.

        :returns: True if server is alive (and responding) or False if an
                  error occurred when trying to connect to the server.
        :rtype: boolean
        """
        res = True
        try:
            if self.db_conn is None:
                res = False
            else:
                # ping and is_connected only work partially, try exec_query
                # to make sure connection is really alive
                retval = self.db_conn.is_open()
                if retval:
                    self.exec_query("SELECT 1")
                else:
                    res = False
        except Exception:  # pylint: disable=W0703
            res = False
        return res

    def _update_alias(self, ip_or_hostname, suffix_list):
        """Update list of aliases for the given IP or hostname.

        Gets the list of aliases for host *ip_or_hostname*. If any
        of them matches one of the server's aliases, then update
        the list of aliases (self.aliases). It also receives a list (tuple)
        of suffixes that can be ignored when checking if two hostnames are
        the same.

        :param ip_or_hostname: IP or hostname to test.
        :type  ip_or_hostname: string
        :param suffix_list:    Tuple with list of suffixes that can be ignored.
        :type  suffix_list:    list

        :returns: True if ip_or_hostname is a server alias, otherwise False.
        :rtype:   boolean
        """
        host_or_ip_aliases = self._get_aliases(ip_or_hostname)
        host_or_ip_aliases.add(ip_or_hostname)

        # Check if any of aliases matches with one the servers's aliases
        common_alias = self.aliases.intersection(host_or_ip_aliases)
        if common_alias:  # There are common aliases, host is the same
            self.aliases.update(host_or_ip_aliases)
            return True
        else:  # Check with and without suffixes
            no_suffix_server_aliases = set()
            no_suffix_host_aliases = set()

            for suffix in suffix_list:
                # Add alias with and without suffix from self.aliases
                for alias in self.aliases:
                    if alias.endswith(suffix):
                        host, _ = alias.rsplit('.', 1)
                        no_suffix_server_aliases.add(host)
                    no_suffix_server_aliases.add(alias)
                # Add alias with and without suffix from host_aliases
                for alias in host_or_ip_aliases:
                    if alias.endswith(suffix):
                        host, _ = alias.rsplit('.', 1)
                        no_suffix_host_aliases.add(host)
                    no_suffix_host_aliases.add(alias)
            # Check if there is any alias in common
            common_alias = no_suffix_host_aliases.intersection(
                no_suffix_server_aliases)
            if common_alias:  # Same host, so update self.aliases
                self.aliases.update(
                    no_suffix_host_aliases.union(no_suffix_server_aliases)
                )
                return True

        return False

    def _get_aliases(self, host):
        """Gets the aliases for the given host.

        :param host:    the host name or IP
        :type  host:    string

        :return: aliases for the given host.
        :rtype:  list
        """
        aliases = set([clean_IPv6(host)])
        if hostname_is_ip(clean_IPv6(host)):  # IP address
            try:
                my_host = socket.gethostbyaddr(clean_IPv6(host))
                aliases.add(my_host[0])
                # socket.gethostbyname_ex() does not work with ipv6
                if (not my_host[0].count(":") < 1 or
                        not my_host[0] == "ip6-localhost"):
                    host_ip = socket.gethostbyname_ex(my_host[0])
                else:
                    addrinfo = socket.getaddrinfo(my_host[0], None)
                    host_ip = ([socket.gethostbyaddr(addrinfo[0][4][0])],
                               [fiveple[4][0] for fiveple in addrinfo],
                               [addrinfo[0][4][0]])
            except (socket.gaierror, socket.herror,
                    socket.error) as err:
                host_ip = ([], [], [])
                if self.verbose:
                    _LOGGER.warning(
                        "IP lookup by address failed for %s, reason: %s",
                        host, err.strerror)
        else:
            try:
                # server may not really exist.
                host_ip = socket.gethostbyname_ex(host)
            except (socket.gaierror, socket.herror,
                    socket.error) as err:
                if self.verbose:
                    _LOGGER.warning(
                        "hostname: %s may not be reachable,reason: %s",
                        host, err.strerror)
                return aliases
            aliases.add(host_ip[0])
            addrinfo = socket.getaddrinfo(host, None)
            local_ip = None
            error = None
            for addr in addrinfo:
                try:
                    local_ip = socket.gethostbyaddr(addr[4][0])
                    break
                except (socket.gaierror, socket.herror,
                        socket.error) as err:
                    error = err

            if local_ip:
                host_ip = ([local_ip[0]],
                           [fiveple[4][0] for fiveple in addrinfo],
                           [addrinfo[0][4][0]])
            else:
                host_ip = ([], [], [])
                if self.verbose:
                    _LOGGER.warning(
                        "IP lookup by name failed for %s, reason: %s",
                        host, error.strerror)
        aliases.update(set(host_ip[1]))
        aliases.update(set(host_ip[2]))

        return aliases

    def is_alias(self, host_or_ip):
        """Determine if the given host is an alias of the server host.

        :param host_or_ip: host or IP address to check.
        :type  host_or_ip: string

        :returns: True is the given host or IP address is an alias of the
                  server host, otherwise False.
        :rtype:   boolean
        """
        # List of possible suffixes
        suffixes = ('.local', '.lan', '.localdomain')

        host_or_ip = clean_IPv6(host_or_ip.lower())

        # for quickness, verify in the existing  aliases, if they exist.
        if self.aliases:
            if host_or_ip.lower() in self.aliases:
                return True
            else:
                # get the alias for the given host_or_ip
                return self._update_alias(host_or_ip, suffixes)

        # no previous aliases information
        # First, get the local information
        hostname_ = socket.gethostname()
        try:
            local_info = socket.gethostbyname_ex(hostname_)
            local_aliases = set([local_info[0].lower()])
            # if dotted host name, take first part and use as an alias
            try:
                local_aliases.add(local_info[0].split('.')[0])
            except Exception:  # pylint: disable=W0703
                pass
            local_aliases.update(['127.0.0.1', 'localhost', '::1', '[::1]'])
            local_aliases.update(local_info[1])
            local_aliases.update(local_info[2])
            local_aliases.update(self._get_aliases(hostname_))
        except (socket.herror, socket.gaierror, socket.error) as err:
            if self.verbose:
                _LOGGER.warning("Unable to find aliases for hostname '%s', "
                                "reason: %s", hostname_, str(err))
            # Try with the basic local aliases.
            local_aliases = set(['127.0.0.1', 'localhost', '::1', '[::1]'])

        # Get the aliases for this server host
        self.aliases = self._get_aliases(self.host)

        # Check if this server is local
        for host in self.aliases.copy():
            if host in local_aliases:
                # Is local then save the local aliases for future.
                self.aliases.update(local_aliases)
                break
            # Handle special suffixes in hostnames.
            for suffix in suffixes:
                if host.endswith(suffix):
                    # Remove special suffix and attempt to match with local
                    # aliases.
                    host, _ = host.rsplit('.', 1)
                    if host in local_aliases:
                        # Is local then save the local aliases for future.
                        self.aliases.update(local_aliases)
                        break

        # Check if the given host_or_ip is alias of the server host.
        if host_or_ip in self.aliases:
            return True

        # Check if any of the aliases of ip_or_host is also an alias of the
        # host server.
        return self._update_alias(host_or_ip, suffixes)

    def user_host_exists(self, user_name, host_name):
        """Check if the 'user_name'@'host_name' account exists.

        This function checks if the specified 'user_name' and 'host_name' match
        an existing account on the server, considering the respective
        parts of the account name 'user_name'@'host_name'.

        Wildcard (%) matches are also taken into consideration for the
        'host_name' part of the account name. For example, if an account
        'myname'@'%' exists then any user with the name 'myname' will match
        the account independently of the host name.

        :param user_name: user name of the account.
        :type  user_name: string
        :param host_name: hostname or IP address of the account.
                          Note: wildcard '%' can be used.
        :type  host_name: string

        :return: True if the given 'user_name' and 'host_name' match an
                 existing account, otherwise False.
        :rtype:  boolean

        :raise GadgetServerError: If an error occurs getting the user accounts
                                  information.
        """
        res = self.exec_query(Query("SELECT host FROM mysql.user WHERE user = ? "
                                    "AND ? LIKE host", user_name, host_name))
        if res:
            return True
        return False

    def get_connection_values(self):
        """Return a dictionary of connection values for the server.

        :return: Return the connection information for the server.
        :rtype:  dictionary
        """
        conn_vals = {
            "user": self.user,
            "host": self.host
        }
        if self.passwd:
            conn_vals["passwd"] = self.passwd
        if self.socket:
            conn_vals["unix_socket"] = self.socket
        if self.port:
            conn_vals["port"] = self.port
        if self.ssl_ca:
            conn_vals["ssl_ca"] = self.ssl_ca
        if self.ssl_cert:
            conn_vals["ssl_cert"] = self.ssl_cert
        if self.ssl_key:
            conn_vals["ssl_key"] = self.ssl_key
        if self.ssl:
            conn_vals["ssl"] = self.ssl

        return conn_vals

    def connect(self):
        """Connect to the server.

        Attempts to connect to the server according to its connection
        parameters.

        Note: This method must be called before executing statements.

        :raise GadgetServerError: if an error occurs during the connection.
        """
        try:
            self.db_conn = self.get_connection()
            if self.ssl:
                res = self.exec_query("SHOW STATUS LIKE 'Ssl_cipher'")
                if res[0][1] == '':
                    raise GadgetCnxError("Can not encrypt server connection.")
        except GadgetServerError:
            # Reset any previous value if the connection cannot be established,
            # before raising an exception. This prevents the use of a broken
            # database connection.
            self.db_conn = None
            raise
        self.connect_error = None
        self.read_only = self.show_server_variable("READ_ONLY")[0][1]

    def get_connection(self):
        """Return a new connection to the server.

        Attempts to connect to the server according to its connection
        parameters and returns a connection object.

        :return: The resulting MySQL connection object.
        :rtype:  MySQLConnection object

        :raise GadgetCnxError: if an error occurred during the server
                               connection process.
        """
        try:
            parameters = {
                'user': self.user,
                'host': self.host,
                'port': self.port,
            }
            if self.socket and os.name == "posix":
                parameters['socket'] = self.socket
            if self.passwd and self.passwd != "":
                parameters['password'] = self.passwd
            parameters['host'] = parameters['host'].replace("[", "")
            parameters['host'] = parameters['host'].replace("]", "")

            # Add SSL parameters ONLY if they are not None
            if self.ssl_ca is not None:
                parameters['ssl-ca'] = self.ssl_ca
            if self.ssl_cert is not None:
                parameters['ssl-cert'] = self.ssl_cert
            if self.ssl_key is not None:
                parameters['ssl-key'] = self.ssl_key

            if self.ssl == False:
                parameters['ssl-mode'] = "DISABLED"

            # The ca certificate is verified only if the ssl option is also
            # specified.
            if self.ssl and parameters['ssl-ca']:
                parameters['ssl-mode'] = "VERIFY_CA"

            if not "ssl-mode" in parameters:
                parameters['ssl-mode'] = "PREFERRED"
            db_conn = mysqlsh.mysql.get_classic_session(parameters)
            # Return MySQL connection object.
            return db_conn
        except mysqlsh.DBError as err:
            _LOGGER.debug("Connector Error: %s", err)
            raise GadgetCnxError(
                err.args[1], err.args[0], cause=err, server=self)
        except AttributeError as err:
            # Might be raised by mysql.connector.connect()
            raise GadgetCnxError(str(err), cause=err, server=self)

    def disconnect(self):
        """Disconnect from the server.
        """
        if self.db_conn is None:
            raise GadgetCnxError("Cannot disconnect from a not connected"
                                 "server. You must use connect() first.")
        try:
            self.db_conn.close()
        except mysqlsh.DBError:
            # No error expected even if already disconnected, anyway ignore it.
            pass

    def get_version(self, full=False):
        """Return version number of the server.

        Get the server version. The respective instance variable is set with
        the result after querying the server the first time. The version is
        immediately returned when already known, avoiding querying the server
        at each time.

        :param full: If True return the full version information including
                     suffixes (e.g., 5.7.14-log), otherwise only the major,
                     minor and release number (e.g., 5.7.14) are returned.
                     By default: False.
        :type  full: boolean

        :return: List of version numbers(ints) if full = False or a string if
                 full = True or None if an error occurs when trying to get
                 the version information from the server.
        :rtype:  string or int list or None
        """
        # Return the local version value if already known.
        if self._version:
            if full:
                return self._version_full
            else:
                return self._version

        # Query the server for its version.
        try:
            res = self.show_server_variable("VERSION")
            if res:
                self._version_full = res[0][1]
                match = re.match(r'^(\d+\.\d+(\.\d+)*).*$',
                                 self._version_full.strip())
                if match:
                    self._version = [int(x) for x in match.group(1).split('.')]
                    # Ensure a 3 elements list
                    self._version = (self._version + [0])[:3]
        except (GadgetCnxError, GadgetQueryError):
            # Ignore errors and return version initialized with None.
            pass

        if full:
            return self._version_full
        else:
            return self._version

    def check_version_compat(self, t_major, t_minor, t_rel):
        """Checks version of the server against requested version.

        This method can be used to check for version compatibility.

        :param t_major: target server version (major)
        :type  t_major: str or int
        :param t_minor: target server version (minor)
        :type  t_minor: str or int
        :param t_rel:   target server version (release)
        :type  t_rel:   str or int

        :return: True if server version is greater or equal (>=) than the
                 specified version. False if server version is lower (<) than
                 the specified version.
        :rtype:  boolean
        """
        version = self.get_version()
        if version is not None:
            return version >= [int(t_major), int(t_minor), int(t_rel)]
        else:
            return False

    def exec_query(self, query, options=None, exec_timeout=0):
        """Execute a query and return result.

        This is the singular method to execute queries. It should be the only
        method used as it contains critical error code to catch the issue
        with mysql.connector throwing an error on an empty result set.

        Notes:
            - It handles exception if query fails.
            - If 'fetch' is False in the options, the method returns the
              cursor instance.
            - By default a commit is performed at the end, unless 'commit'
              is set to False in the options.

        :param query:    Query object or string with the SQL statement to
                         execute.
        :type  query:    Query
        :type  query:    str
        :param options:      Options to control the execution of the statement.
                             The follow values are supported:
            columns        Add column headings as first row. By default, False.
            fetch          Execute the fetch as part of the operation and
                           use a buffered cursor. By default, True.
            raw            If True use a buffered raw cursor, meaning that
                           all returned values are strings (i.e., not converted
                           to the corresponding Python type). By default, True.
            commit         Perform a commit (if needed) automatically at the
                           end. By default, True.
        :type  options:      dictionary
        :param exec_timeout: Timeout value in seconds to kill the query
                             execution if exceeded. Value must be greater than
                             zero for this feature to be enabled. By default 0,
                             meaning that the query will not be killed.
        :type  exec_timeout: integer

        :return: List of rows (tuples) or Cursor with the result of the query.
        :rtype:  list of tuples or Cursor object

        :raise GadgetCnxError: If an error occurs with the server connection
                               or creating the cursor.
        :raise GadgetQueryError: If an error occurs when excuting the
                                 statement (query), fetching results or
                                 committing changes.
        """
        if options is None:
            options = {}
        columns = options.get('columns', False)
        fetch = options.get('fetch', True)
        raw = options.get('raw', True)
        do_commit = options.get('commit', True)

        if isinstance(query, str):
            query = Query(query)

        # Guard for connect() prerequisite
        if not self.db_conn:
            raise GadgetCnxError(
                "You must call connect before executing a query.", server=self)

        # Execute query, handling parameters.
        q_killer = None
        try:
            if exec_timeout > 0:
                if query.params():
                    raise NotImplementedError()
                # Spawn thread to kill query if timeout is reached.
                # Note: set it as daemon to avoid waiting for it on exit.
                q_killer = QueryKillerThread(self, query.query(), exec_timeout)
                q_killer.daemon = True
                q_killer.start()
            # Execute query.
            cur = None

            _LOGGER.debug("MySQL query: " + query.log())
            if query.params():
                cur = MySQLUtilsCursorResult(
                    self.db_conn.run_sql(query.query(), query.params()))
            else:
                cur = MySQLUtilsCursorResult(
                    self.db_conn.run_sql(query.query()))
        except mysqlsh.DBError as err:
            if cur:
                cur.close()
            if err.args[0] == CR_SERVER_LOST and exec_timeout > 0:
                # If the connection is killed (because the execution timeout is
                # reached), then it attempts to re-establish it (to execute
                # further queries) and raise a specific exception to track this
                # event.
                # CR_SERVER_LOST = Errno 2013 Lost connection to MySQL server
                # during query.
                self.connect()
                raise GadgetQueryError("Timeout executing query", query.log(),
                                       errno=err.args[0], cause=err, server=self)
            else:
                raise GadgetQueryError("Query failed. {0}".format(err),
                                       query.log(), errno=err.args[0], cause=err,
                                       server=self)
        except Exception as err:
            if cur:
                cur.close()
            raise GadgetQueryError("Unknown error: {0}".format(err),
                                   query.log(), errno=0, cause=err,
                                   server=self)
        finally:
            # Stop query killer thread if alive.
            if q_killer and q_killer.is_alive():
                q_killer.stop()

        # Fetch rows (only if available or fetch = True).
        if cur.with_rows:
            if fetch or columns:
                try:
                    results = cur.fetchall()
                    if columns:
                        col_headings = cur.column_names
                        col_names = []
                        for col in col_headings:
                            col_names.append(col)
                        results = col_names, results
                except mysqlsh.DBError as err:
                    raise GadgetQueryError(
                        "Error fetching all query data: {0}".format(err),
                        query.log(), errno=err.args[0], cause=err, server=self)
                finally:
                    cur.close()
                return results
            else:
                # Return cursor to fetch rows elsewhere (fetch = false).
                return cur
        else:
            # No results (not a SELECT)
            try:
                if do_commit:
                    self.db_conn.run_sql("commit")
            except mysqlsh.DBError as err:
                raise GadgetQueryError(
                    "Error performing commit: {0}".format(err), query.log(),
                    errno=err.args[0], cause=err, server=self)
            finally:
                cur.close()
            return cur

    def commit(self):
        """Perform a COMMIT.

        :raise GadgetCnxError: If connection was not previously established.
        """
        # Guard for connect() prerequisite
        if not self.db_conn:
            raise GadgetCnxError(
                "You must call connect before commit.", server=self)

        self.db_conn.run_sql("commit")

    def rollback(self):
        """Perform a ROLLBACK.

        :raise GadgetCnxError: If connection was not previously established.
        """
        # Guard for connect() prerequisite
        if not self.db_conn:
            raise GadgetCnxError(
                "You must call connect before rollback.", server=self)

        self.db_conn.run_sql("rollback")

    def show_server_variable(self, variable):
        """Get the variable information using SHOW VARIABLES statement.

        Return one or more rows from the SHOW VARIABLES statement according
        to the specified variable parameter.

        :param variable:   The variable name or wildcard string to match the
                           returned variable information.
        :type  variable:   string

        :returns: Variables names and respective value matching the specified
                  variable parameter.
        :rtype:   list of tuples

        :raise GadgetServerError: If an error occur getting the variable value.
        """
        return self.exec_query(Query("SHOW VARIABLES LIKE ?", variable))

    def set_variable(self, var_name, var_value, var_type="session"):
        """Set server variable using the SET statement.

        This function sets the value of system variables using the SET
        statement.

        :param var_name: Name of the variable to set.
        :type  var_name: str
        :param var_value: value to which we want to set the variable.
        :type  var_value: str
        :param var_type: Type of the variable ('session', 'global', 'persist'
                         'persist_only'). By default session is used.
        :type  var_type: str
        :raises GadgetDBError: if an invalid var_type is provided as argument.
        """

        if var_type.lower() in ('global', 'session', 'persist',
                                'persist_only'):
            var_type = '{0}.'.format(var_type)  # Add dot (.)
        else:
            raise GadgetError("Invalid variable type: {0}. Supported types: "
                              "'global' and 'session'.".format(var_type))

        # Execute SET @@var_type.var_name.
        self.exec_query("SET @@{0}{1}={2}".format(var_type, var_name,
                                                  var_value))

    def select_variable(self, var_name, var_type=None):
        """Get server system variable value using SELECT statement.

        This function displays the value of system variables using the SELECT
        statement. This can be used as a workaround for variables with very
        long values, as SHOW VARIABLES is subject to a version-dependent
        display-width limit.

        Note: Some variables may not be available using SELECT @@var_name, in
        such cases use SHOW VARIABLES LIKE 'var_name'.

        :param var_name: Name of the variable to display.
        :type  var_name: string
        :param var_type: Type of the variable ('session' or 'global'). By
                         default None (no type is used), meaning that the
                         session value is returned if it exists and the global
                         value otherwise.
        :type  var_type: 'session', 'global', '', or None

        :return: value for the given server system variable.
        :rtype:  string

        :raise GadgetServerError: If an unsupported variable type is
                                  specified.
        :raise GadgetQueryError: If the variable does not exist in the server.
        """
        if var_type is None:
            var_type = ''
        elif var_type.lower() in ('global', 'session', ''):
            var_type = '{0}.'.format(var_type)  # Add dot (.)
        else:
            raise GadgetServerError(
                "Invalid variable type: {0}. Supported types: "
                "'global' and 'session'.".format(var_type), server=self)
        # Execute SELECT @@[var_type.]var_name.
        # Note: An error is issued if the given variable is not known.
        res = self.exec_query("SELECT @@{0}{1}".format(var_type, var_name))
        return res[0][0]

    def has_default_value(self, var_name):
        """Check if the variable has the default value or was already change.

        Return a boolean value indicating if the variable is set with the
        compiled default value (true), or if it was already changed explicitly
        by the user somehow (false), i.e. variable changed with the SET
        statement, a command line option, or the configuration file.

        NOTE: This method requires the performance_schema to be enabled.

        :param var_name: Name of the target variable to check.
        :type var_name: string
        :return: True if the variable value is the compiled default one, or
                 False otherwise (meaning that the variable value was already
                 changed by the user).
        :rtype: boolean
        """
        res = self.exec_query(Query("SELECT variable_source "
                                    "FROM performance_schema.variables_info "
                                    "WHERE variable_name=?", var_name))
        if res[0][0] == 'COMPILED':
            return True
        else:
            return False

    def flush_logs(self, log_type=None):
        """Execute the FLUSH [log_type] LOGS statement.

        Reload internal logs cache and closes and reopens all log files, or
        only of the specified log_type.

        Note: The log_type option is available starting from MySQL 5.5.3.

        :param log_type: Type of the log files to be flushed. Supported values:
                         BINARY, ENGINE, ERROR, GENERAL, RELAY, SLOW.
        :type  log_type: string

        :raise GadgetServerError: If an error occurs when executing the FLUSH
                                  LOGS statement.
        """
        if log_type:
            self.exec_query("FLUSH {0} LOGS".format(log_type))
        else:
            self.exec_query("FLUSH LOGS")

    def supports_gtid(self):
        """Check if the server supports GTIDs.

        :return: True if GTID is supported and turned on and False if
                 supported but not enabled.
        :rtype:  boolean
        :raise GadgetServerError: If GTID is not supported or the GTID mode
                                  cannot be obtained.
        """
        # Check server for GTID support
        version_ok = self.check_version_compat(5, 6, 5)
        if not version_ok:
            raise GadgetServerError("GTIDs are only supported starting from"
                                    "MySQL 5.6.5", server=self)
        try:
            res = self.exec_query("SELECT @@GLOBAL.GTID_MODE")
        except (GadgetCnxError, GadgetQueryError) as err:
            raise GadgetServerError(
                "Unable to get GTID_MODE value: {0}".format(err.errmsg),
                cause=err, server=self)
        # Return result
        if res[0][0] == 'ON':
            return True
        elif res[0][0] == 'OFF':
            return False
        else:
            raise GadgetServerError(
                "Unexpected value for @@GLOBAL.GTID_MODE: {0}."
                "Expected: 'ON' or 'OFF'.".format(res[0][0]), server=self)

    def get_gtid_executed(self, skip_gtid_check=True):
        """Get the executed GTID set of the server.

        This function retrieves the (current) GTID_EXECUTED set of the server.

        :param skip_gtid_check:  Flag indicating if the check for GTID support
                                will be skipped or not. By default 'True'
                                (check is skipped).

        :returns a string with the GTID_EXECUTED set for this server.
        :rtype str
        :raises GadgetError: if GTIDs are not supported or not enabled.
        """
        if not skip_gtid_check:
            # Check server for GTID support.
            gtid_support = self.supports_gtid()
            if not gtid_support:
                raise GadgetServerError("Global Transaction IDs are not "
                                        "supported.", server=self)
        # Get GTID_EXECUTED.
        try:
            return self.exec_query("SELECT @@GLOBAL.GTID_EXECUTED")[0][0]
        except GadgetQueryError:
            if skip_gtid_check:
                # Query likely failed because GTIDs are not supported,
                # therefore skip error in this case.
                return ""
            else:
                # If GTID check is not skipped re-raise exception.
                raise
        except IndexError:
            # If no rows are returned by query then return an empty string.
            return ""

    def supports_plugin(self, plugin, state='ACTIVE'):
        """Check if the given plugin is supported.

        Check if the server supports the specified plugin. Return True if
        plugin is installed and active.

        :param plugin: Name of plugin to check
        :type  plugin: string
        :param state: the expected plugin state to check, by default ACTIVE.
        :type  state: string

        :returns: True if the plugin is supported and it has the given state,
                  False otherwise.
        :rtype:   boolean

        :raise GadgetServerError: If an error occurs when checking for the
                                  plugin support.
        """
        _PLUGIN_QUERY = Query(
            "SELECT PLUGIN_NAME, PLUGIN_STATUS "
            "FROM INFORMATION_SCHEMA.PLUGINS WHERE PLUGIN_NAME LIKE ?", "{0}%".format(plugin))
        res = self.exec_query(_PLUGIN_QUERY)
        if not res:
            # plugin not found.
            _LOGGER.debug("Plugin %s is not installed", plugin)
            return False

        elif res[0][1].upper() != state.upper():
            # The state is not the desired.
            _LOGGER.debug("Plugin %s has state: %s and not the expected: %s",
                          plugin, res[0][1], state)
            return False
        return True

    def get_all_databases(self, ignore_internal_dbs=True):
        """Get all databases from the server.

        Retrieve the list of all databases on the server, except for internal
        databases (e.g., INFORMATION_SCHEMA and PERFORMANCE_SCHEMA) if
        the 'ignore_internal_dbs' parameter is set to True.

        Note: New internal database 'sys' added by default for MySQL 5.7.7+.

        :param ignore_internal_dbs: Ignore internal databases.
        :type  ignore_internal_dbs: boolean

        :returns: Result with the name of all the databases in the server.
        :rtype:   list of tuples

        :raise GadgetServerError: If an error occurs when getting the list of
                                  all databases.
        """

        if ignore_internal_dbs:
            _GET_DATABASES = """
            SELECT SCHEMA_NAME
            FROM INFORMATION_SCHEMA.SCHEMATA
            WHERE SCHEMA_NAME != 'INFORMATION_SCHEMA'
            AND SCHEMA_NAME != 'PERFORMANCE_SCHEMA'
            """
            # Starting from MySQL 5.7.7, sys schema is installed by default.
            if self.check_version_compat(5, 7, 7):
                _GET_DATABASES = ("{0} AND SCHEMA_NAME != 'sys'"
                                  "".format(_GET_DATABASES))
        else:
            _GET_DATABASES = """
            SELECT SCHEMA_NAME
            FROM INFORMATION_SCHEMA.SCHEMATA
            """

        return self.exec_query(_GET_DATABASES)

    def read_and_exec_sql(self, input_file, verbose=False):
        """Read an input file containing SQL statements and execute them.

        :param input_file: The full path to the SQL file.
        :type  input_file: string
        :param verbose:    Log the read statements. By default, False.
        :type  verbose:    boolean

        :raise GadgetServerError: If an error occurs when executing the
                                  statements in the SQL file.
        """
        with open(input_file) as f_input:
            while True:
                cmd = f_input.readline()
                if not cmd:
                    break
                if len(cmd) > 1:
                    if cmd[0] != '#':
                        if verbose:
                            _LOGGER.debug("%s", cmd)
                        query_options = {
                            'fetch': False
                        }
                        self.exec_query(cmd, query_options)

    def binlog_enabled(self):
        """Check binary logging status.

        Check if binary logging is enabled on the server based on the value
        of the 'log_bin' variable.

        :return: False if binary logging is disabled and True otherwise.
        :rtype:  boolean

        :raise GadgetServerError: If an error occur when trying to get the
                                  value of the 'log_bin' variable.
        """
        try:
            res = self.show_server_variable("log_bin")
        except GadgetServerError as err:
            raise GadgetServerError("Cannot get value of 'log_bin' variable. "
                                    "{0}".format(err.errmsg), cause=err,
                                    server=self)
        if not res:
            raise GadgetServerError(
                "No value returned for 'log_bin' variable.", server=self)
        if res[0][1] in ("OFF", "0"):
            return False
        return True

    def toggle_global_read_lock(self, enable=True):
        """
        Enable or disable read-only mode on the server.

        Note: user must have SUPER privilege

        :param enable: Indicate if read-only mode will be enabled (True) or
                       disabled (False). If True (default) then flush all
                       tables with read lock and set the [super_]read_only to
                       'ON'. If False then set [super_]read_only  mode to 'OFF'
                       and unlock all tables.
        :type enable: boolean
        """

        # Starting with version 5.7.8 MySQL has a new super_read_only system
        # variable
        has_super_read_only = self.check_version_compat(5, 7, 8)
        var_name = "super_read_only" if has_super_read_only else "read_only"

        if enable:
            self.exec_query("FLUSH TABLES WITH READ LOCK")
            self.set_variable(var_name, "ON", "global")
        else:
            self.set_variable(var_name, "OFF", "global")
            self.exec_query("UNLOCK TABLES")

    def toggle_binlog(self, action='disable'):
        """Enable or disable binary logging.

        Note: User must have SUPER privilege.

        :param action: if 'disable' then turn off the binary logging.
                       if 'enable' then turn on binary logging. If none
                       of the previous action is specified then nothing is
                       done (no action).
        :type  action: 'disable' or 'enable'

        :raise GadgetServerError: If an error occur when setting the
                                  SQL_LOG_BIN value.
        """
        if action.lower() == 'disable':
            self.exec_query("SET SQL_LOG_BIN=0")
        elif action.lower() == 'enable':
            self.exec_query("SET SQL_LOG_BIN=1")

    def get_server_id(self):
        """Retrieve the server ID.

        :return: Value of the 'server_id' variable.
        :rtype:  integer

        :raise GadgetServerError: If an error occurs when trying to retrieve
                                  the server ID.
        """
        try:
            res = self.show_server_variable("server_id")
        except GadgetServerError as err:
            raise GadgetServerError("Cannot retrieve 'server_id'. "
                                    "{0}".format(err.errmsg), cause=err,
                                    server=self)
        return int(res[0][1])

    def get_server_uuid(self):
        """Retrieve the UUID of the server.

        :return: Value of the 'server_uuid' variable. If not available None
                 is returned.
        :rtype:  string

        :raises GadgetServerError: If an errors occurs when trying to retrieve
                                   the server UUID.
        """
        try:
            res = self.show_server_variable("server_uuid")
            if res is None or res == []:
                return None
        except GadgetServerError as err:
            raise GadgetServerError("Cannot retrieve 'server_uuid': "
                                    "{0}".format(err.errmsg), cause=err,
                                    server=self)
        return res[0][1]

    def grant_tables_enabled(self):
        """Check if grant tables is enabled on the server.

        In other words, this functions checks if the privileges system is
        enabled on the server. Grant tables might be disabled if the
        --skip-grant-tables option is used when starting the server.

        :return: True if grant tables (privileges system) is enabled otherwise
                 False (disabled).
        :rtype:  boolean
        """
        if self.grants_enabled is None:
            try:
                self.exec_query("SHOW GRANTS FOR 'snuffles'@'host'")
                self.grants_enabled = True
            except (GadgetCnxError, GadgetQueryError) as err:
                if (err.args[0] == ER_OPTION_PREVENTS_STATEMENT and
                        "--skip-grant-tables" in err.errmsg):
                    self.grants_enabled = False
                # Ignore other errors as they are not pertinent to the check
                else:
                    self.grants_enabled = True
        return self.grants_enabled

    def get_server_binlogs_list(self, include_size=False):
        """Get the binary log file names listed on a server.

        Obtains the binlog file names available on the server by using the
        'SHOW BINARY LOGS' statement, returning these file names as a list.

        :param include_size: Indicate if the returning list shall include the
                             size of the file.
        :type  include_size: boolean

        :return: List with the binary log file names available on the server.
        :rtype:  list

        :raises GadgetServerError: If an errors occurs when executing SHOW
                                   BINARY LOGS.
        """
        res = self.exec_query("SHOW BINARY LOGS")

        server_binlogs = []
        for row in res:
            if include_size:
                server_binlogs.append(row)
            else:
                server_binlogs.append(row[0])
        return server_binlogs

    def is_plugin_installed(self, plugin_name, is_active=False,
                            silence_warnings=False):
        """Test if the given plugin is installed/loaded in this server.

        :param plugin_name: The name of the plugin to load.
        :type plugin_name:  str.
        :param is_active: If True verifies the plugin is also in 'ACTIVE'
                          state.
        :type is_active:  bool
        :param silence_warnings: If True avoids logging warning messages.
        :type silence_warnings:  bool (False by default)

        :return: True if the plugin is installed otherwise False.
        :rtype: bool.
        """
        res = self.exec_query("show plugins")
        for row in res:
            if plugin_name in row[0]:
                if is_active:
                    return True if "ACTIVE" in row[1] else False
                else:
                    return True
        if not silence_warnings:
            _LOGGER.warning("The %s plugin has not been "
                            "installed/loaded in %s", plugin_name, self)
        return False

    def install_plugin(self, plugin_name):
        """Attempts to install/load the given plugin in this server.

        :param plugin_name: The name of the plugin to load.
        :type plugin_name:  str.

        :raise GadgetServerError: If the GR plugin could not be loaded.

        :return:  True if the plugin is successfully loaded or was already
                  loaded, else an exception is raised.
        :rtype:   boolean

        :raise GadgetServerError: If the plugin could not be installed.
        """
        msg = "Initializing {0} plugin on {1}".format(plugin_name, self)
        _LOGGER.info(msg)
        if self.is_plugin_installed(plugin_name, silence_warnings=True):
            return True
        else:
            try:
                if "WIN" in self.select_variable("version_compile_os").upper():
                    ext = ".dll"
                else:
                    ext = ".so"
                self.exec_query("INSTALL PLUGIN {plugin_name} SONAME "
                                "'{plugin_name}{ext}'"
                                "".format(plugin_name=plugin_name, ext=ext))
                _LOGGER.debug("The %s plugin has been successfully install "
                              "in server: %s", plugin_name, self)
            except GadgetQueryError as err:
                if "already exists" in err.errmsg:
                    _LOGGER.debug("The %s plugin is already installed: %s"
                                  "", plugin_name, err.errmsg)
                else:
                    _LOGGER.error("An error was found trying to install the "
                                  "%s plugin: %s", plugin_name, err.errmsg)
                    raise GadgetServerError("The {0} plugin could not be "
                                            "loaded in the server {1}"
                                            "".format(plugin_name, self),
                                            cause=err)
        return True

    def uninstall_plugin(self, plugin_name):
        """Attempts to uninstall the given plugin in this server.

        :param plugin_name: The name of the plugin to unload.
        :type plugin_name:  str.

        :raise GadgetServerError: If the plugin could not be uninstalled.
        """
        try:
            self.exec_query("UNINSTALL PLUGIN {plugin}"
                            "".format(plugin=plugin_name))
        except GadgetQueryError as err:
            if "does not exist" in err.errmsg:
                _LOGGER.debug("The %s plugin is not installed: %s"
                              "", plugin_name, err.errmsg)
            else:
                _LOGGER.error("An error was found trying to uninstall the %s "
                              "plugin: %s", plugin_name, err.errmsg)
                raise GadgetServerError("The {0} plugin could not be "
                                        "uninstalled in the server {1}"
                                        "".format(plugin_name, self),
                                        cause=err)

    def start_plugin(self, plugin_name):
        """Starts the given plugin
        :param plugin_name: The name of the plugin to load.
        :type plugin_name:  str.
        """
        self.exec_query("START {plugin}".format(plugin=plugin_name))

    def stop_plugin(self, plugin_name):
        """Stops the given plugin
        :param plugin_name: The name of the plugin to load.
        :type plugin_name:  str.
        """
        self.exec_query("STOP {plugin}".format(plugin=plugin_name))

    def __str__(self):
        """String representation of the class Server

        :return: representation the server with information of the host
                 and port.
        :rtype:  string
        """
        if self.socket and os.name == "posix":
            return "'{0}:{1}'".format(self.host, self.socket)
        else:
            return "'{0}:{1}'".format(self.host, self.port)


class QueryKillerThread(threading.Thread):
    """Class to run a thread to kill an executing query.

    This class is used to spawn a thread than will kill the execution
    (connection) of a query upon reaching a given timeout.
    """

    def __init__(self, server, query, timeout):
        """Constructor.

        :param server:  Server instance where the target query is executed.
        :type  server:  Server object
        :param query:   Target query to kill.
        :type  query:   string
        :param timeout: Timeout value in seconds used to kill the query when
                        reached.
        :type  timeout: integer
        """
        threading.Thread.__init__(self)
        self._stop_event = threading.Event()
        self._query = query
        self._timeout = timeout
        self._server = server
        self._connection = server.get_connection()
        server.get_version()

    def run(self):
        """Main execution of the query killer thread.
        Stop the thread if instructed as such
        """
        connector_error = None
        # Kill the query connection upon reaching the given execution timeout.
        while not self._stop_event.is_set():
            # Wait during the defined time.
            self._stop_event.wait(self._timeout)
            # If the thread was asked to stop during wait, it does not try to
            # kill the query.
            if not self._stop_event.is_set():
                try:
                    cur = None
                    # Get process information from threads table when available
                    # (for versions > 5.6.1), since it does not require a mutex
                    # and has minimal impact on server performance.
                    cur = MySQLUtilsCursorResult(self._connection.run_sql(
                        "SELECT processlist_id "
                        "FROM performance_schema.threads"
                        " WHERE processlist_command='Query'"
                        " AND processlist_info='{0}'".format(self._query)))
                    result = cur.fetchall()

                    try:
                        process_id = result[0][0]
                    except IndexError:
                        # No rows are returned if the query ended in the
                        # meantime.
                        process_id = None

                    # Kill the connection associated to que process id.
                    # Note: killing the query will not work with
                    # connector-python,since it will hang waiting for the
                    #  query to return.
                    if process_id:
                        self._connection.run_sql("KILL {0}".format(process_id))
                except mysqlsh.DBError as err:
                    # Hold error to raise at the end.
                    connector_error = err
                finally:
                    # Close cursor if available.
                    if cur:
                        cur.close()
                # Stop this thread.
                self.stop()

        # Close connection.
        try:
            self._connection.disconnect()
        except mysqlsh.DBError:
            # Only raise error if no previous error has occurred.
            if not connector_error:
                raise
        finally:
            # Raise any previous error that already occurred.
            if connector_error is not None:
                # pylint: disable=E0702
                raise connector_error

    def stop(self):
        """Stop the thread.

        Set the event flag for the thread to stop as soon as possible.
        """
        self._stop_event.set()


class LocalErrorLog(object):
    """This class can be used to read the local Error log file.
    Note: In the case the log_error variable is set to "stderr", then it will
    not be possible to retrieve the messages.
    """

    def __init__(self, server, raise_error=False):
        """Constructor

        Create the LocalErrorLog object setting the server to retrieve
        messages logged to its error log file.

        :param server: The server to retrieve messages from the error log.
        :type server:  mysql_gadgets.common.server.Server

        :raise GadgetServerError: If the "log_error" var is set to stderr
                                  instead of a file.
        """
        self.log_error = server.select_variable("log_error")
        if self.log_error == "stderr":
            if raise_error:
                raise GadgetServerError("The log error is set to stderr")
            self._log_file = None
        else:
            datadir = server.select_variable("datadir")
            self._log_file = get_abs_path(self.log_error, datadir)

    def get_size(self):
        """Get the current size of the log error file.

        Returns the current size of the server log error file. This method
        can be used as a starting point to read from the file.

        :return: Size of the error log file.
        :rtype: int
        """
        if self._log_file:
            return os.stat(self._log_file).st_size
        return None

    def read(self, offset, errors_only=True):
        """Reads the Error log file.

        :param offset: Starting position.
        :type offset:  int
        :param errors_only: If the messages returned should be "ERROR"
                            messages only, By default True.
        :type errors_only:  boolean

        :return: Messages logged at the logging file.
        :rtype: str
        """
        if self._log_file:
            with open(self._log_file, "r") as e_file:
                if offset:
                    e_file.seek(offset)
                result = []
                for line in e_file:
                    if errors_only and "[ERROR]" not in line:
                        continue
                    result.append(line)
                return "".join(result)
        return ""
