awswrangler/redshift/_connect.py (92 lines of code) (raw):

"""Amazon Redshift Connect Module (PRIVATE).""" from __future__ import annotations from typing import TYPE_CHECKING, Any import boto3 from awswrangler import _databases as _db_utils from awswrangler import _utils, exceptions if TYPE_CHECKING: try: import redshift_connector except ImportError: pass else: redshift_connector = _utils.import_optional_dependency("redshift_connector") def _validate_connection(con: "redshift_connector.Connection") -> None: if not isinstance(con, redshift_connector.Connection): raise exceptions.InvalidConnection( "Invalid 'conn' argument, please pass a " "redshift_connector.Connection object. Use redshift_connector.connect() to use " "credentials directly or wr.redshift.connect() to fetch it from the Glue Catalog." ) @_utils.check_optional_dependency(redshift_connector, "redshift_connector") def connect( connection: str | None = None, secret_id: str | None = None, catalog_id: str | None = None, dbname: str | None = None, boto3_session: boto3.Session | None = None, ssl: bool = True, timeout: int | None = None, max_prepared_statements: int = 1000, tcp_keepalive: bool = True, **kwargs: Any, ) -> "redshift_connector.Connection": """Return a redshift_connector connection from a Glue Catalog or Secret Manager. Note ---- You MUST pass a `connection` OR `secret_id`. Here is an example of the secret structure in Secrets Manager: { "host":"my-host.us-east-1.redshift.amazonaws.com", "username":"test", "password":"test", "engine":"redshift", "port":"5439", "dbname": "mydb" } https://github.com/aws/amazon-redshift-python-driver Parameters ---------- connection Glue Catalog Connection name. secret_id Specifies the secret containing the connection details that you want to retrieve. You can specify either the Amazon Resource Name (ARN) or the friendly name of the secret. catalog_id The ID of the Data Catalog. If none is provided, the AWS account ID is used by default. dbname Optional database name to overwrite the stored one. boto3_session The default boto3 session will be used if **boto3_session** is ``None``. ssl This governs SSL encryption for TCP/IP sockets. This parameter is forward to redshift_connector. https://github.com/aws/amazon-redshift-python-driver timeout This is the time in seconds before the connection to the server will time out. The default is None which means no timeout. This parameter is forward to redshift_connector. https://github.com/aws/amazon-redshift-python-driver max_prepared_statements This parameter is forward to redshift_connector. https://github.com/aws/amazon-redshift-python-driver tcp_keepalive If True then use TCP keepalive. The default is True. This parameter is forward to redshift_connector. https://github.com/aws/amazon-redshift-python-driver **kwargs Forwarded to redshift_connector.connect. e.g. ``is_serverless=True, serverless_acct_id='...', serverless_work_group='...'`` Returns ------- ``redshift_connector`` connection. Examples -------- Fetching Redshift connection from Glue Catalog >>> import awswrangler as wr >>> with wr.redshift.connect("MY_GLUE_CONNECTION") as con: ... with con.cursor() as cursor: ... cursor.execute("SELECT 1") ... print(cursor.fetchall()) Fetching Redshift connection from Secrets Manager >>> import awswrangler as wr >>> with wr.redshift.connect(secret_id="MY_SECRET") as con: ... with con.cursor() as cursor: ... cursor.execute("SELECT 1") ... print(cursor.fetchall()) """ attrs: _db_utils.ConnectionAttributes = _db_utils.get_connection_attributes( connection=connection, secret_id=secret_id, catalog_id=catalog_id, dbname=dbname, boto3_session=boto3_session ) if attrs.kind != "redshift": raise exceptions.InvalidDatabaseType( f"Invalid connection type ({attrs.kind}. It must be a redshift connection.)" ) return redshift_connector.connect( user=attrs.user, database=attrs.database, password=attrs.password, port=int(attrs.port), host=attrs.host, ssl=ssl, timeout=timeout, max_prepared_statements=max_prepared_statements, tcp_keepalive=tcp_keepalive, **kwargs, ) @_utils.check_optional_dependency(redshift_connector, "redshift_connector") def connect_temp( cluster_identifier: str, user: str, database: str | None = None, duration: int = 900, auto_create: bool = True, db_groups: list[str] | None = None, boto3_session: boto3.Session | None = None, ssl: bool = True, timeout: int | None = None, max_prepared_statements: int = 1000, tcp_keepalive: bool = True, **kwargs: Any, ) -> "redshift_connector.Connection": """Return a redshift_connector temporary connection (No password required). https://github.com/aws/amazon-redshift-python-driver Parameters ---------- cluster_identifier The unique identifier of a cluster. This parameter is case sensitive. user The name of a database user. database Database name. If None, the default Database is used. duration The number of seconds until the returned temporary password expires. Constraint: minimum 900, maximum 3600. Default: 900 auto_create Create a database user with the name specified for the user named in user if one does not exist. db_groups A list of the names of existing database groups that the user named in user will join for the current session, in addition to any group memberships for an existing user. If not specified, a new user is added only to PUBLIC. boto3_session The default boto3 session will be used if **boto3_session** is ``None``. ssl This governs SSL encryption for TCP/IP sockets. This parameter is forward to redshift_connector. https://github.com/aws/amazon-redshift-python-driver timeout This is the time in seconds before the connection to the server will time out. The default is None which means no timeout. This parameter is forward to redshift_connector. https://github.com/aws/amazon-redshift-python-driver max_prepared_statements This parameter is forward to redshift_connector. https://github.com/aws/amazon-redshift-python-driver tcp_keepalive If True then use TCP keepalive. The default is True. This parameter is forward to redshift_connector. https://github.com/aws/amazon-redshift-python-driver **kwargs Forwarded to redshift_connector.connect. e.g. is_serverless=True, serverless_acct_id='...', serverless_work_group='...' Returns ------- ``redshift_connector`` connection. Examples -------- >>> import awswrangler as wr >>> with wr.redshift.connect_temp(cluster_identifier="my-cluster", user="test") as con: ... with con.cursor() as cursor: ... cursor.execute("SELECT 1") ... print(cursor.fetchall()) """ client_redshift = _utils.client(service_name="redshift", session=boto3_session) args: dict[str, Any] = { "DbUser": user, "ClusterIdentifier": cluster_identifier, "DurationSeconds": duration, "AutoCreate": auto_create, } if db_groups is not None: args["DbGroups"] = db_groups else: db_groups = [] res = client_redshift.get_cluster_credentials(**args) cluster = client_redshift.describe_clusters(ClusterIdentifier=cluster_identifier)["Clusters"][0] return redshift_connector.connect( user=res["DbUser"], database=database if database else cluster["DBName"], password=res["DbPassword"], port=cluster["Endpoint"]["Port"], host=cluster["Endpoint"]["Address"], ssl=ssl, timeout=timeout, max_prepared_statements=max_prepared_statements, tcp_keepalive=tcp_keepalive, db_groups=db_groups, **kwargs, )