in providers/mysql/src/airflow/providers/mysql/hooks/mysql.py [0:0]
def _get_conn_config_mysql_client(self, conn: Connection) -> dict:
conn_config = {
"user": conn.login,
"passwd": conn.password or "",
"host": conn.host or "localhost",
"db": self.schema or conn.schema or "",
}
# check for authentication via AWS IAM
if conn.extra_dejson.get("iam", False):
conn_config["passwd"], conn.port = self.get_iam_token(conn)
conn_config["read_default_group"] = "enable-cleartext-plugin"
conn_config["port"] = int(conn.port) if conn.port else 3306
if conn.extra_dejson.get("charset", False):
conn_config["charset"] = conn.extra_dejson["charset"]
if conn_config["charset"].lower() in ("utf8", "utf-8"):
conn_config["use_unicode"] = True
if conn.extra_dejson.get("cursor", False):
try:
import MySQLdb.cursors
except ImportError:
raise RuntimeError(
"You do not have `mysqlclient` package installed. "
"Please install it with `pip install mysqlclient` and make sure you have system "
"mysql libraries installed, as well as well as `pkg-config` system package "
"installed in case you see compilation error during installation."
)
cursor_type = conn.extra_dejson.get("cursor", "").lower()
# Dictionary mapping cursor types to their respective classes
cursor_classes = {
"sscursor": MySQLdb.cursors.SSCursor,
"dictcursor": MySQLdb.cursors.DictCursor,
"ssdictcursor": MySQLdb.cursors.SSDictCursor,
}
# Set the cursor class in the connection configuration based on the cursor type
if cursor_type in cursor_classes:
conn_config["cursorclass"] = cursor_classes[cursor_type]
if conn.extra_dejson.get("ssl", False):
# SSL parameter for MySQL has to be a dictionary and in case
# of extra/dejson we can get string if extra is passed via
# URL parameters
dejson_ssl = conn.extra_dejson["ssl"]
if isinstance(dejson_ssl, str):
dejson_ssl = json.loads(dejson_ssl)
conn_config["ssl"] = dejson_ssl
if conn.extra_dejson.get("ssl_mode", False):
conn_config["ssl_mode"] = conn.extra_dejson["ssl_mode"]
if conn.extra_dejson.get("unix_socket"):
conn_config["unix_socket"] = conn.extra_dejson["unix_socket"]
if self.local_infile:
conn_config["local_infile"] = 1
if self.init_command:
conn_config["init_command"] = self.init_command
return conn_config