def get_connection_string()

in src/SimpleReplay/replay.py [0:0]


def get_connection_string(username, database=None, max_attempts=10, skip_cache=False):
    credentials_timeout_sec = 3600
    retry_delay_sec = 10

    # how long to cache credentials per user
    cache_timeout_sec = 1800

    # check the cache
    if not skip_cache and g_credentials_cache.get(username) is not None:
        record = g_credentials_cache.get(username)
        if (datetime.datetime.now(tz=datetime.timezone.utc) - record['last_update']).total_seconds() < cache_timeout_sec:
            logger.debug(f'Using {username} credentials from cache')
            return record['target_cluster_urls']
        del g_credentials_cache[username]

    cluster_endpoint = g_config["target_cluster_endpoint"]
    odbc_driver = g_config["odbc_driver"]

    cluster_endpoint_split = cluster_endpoint.split(".")
    cluster_id = cluster_endpoint_split[0]
    cluster_host = cluster_endpoint.split(":")[0]
    cluster_port = cluster_endpoint_split[5].split("/")[0][4:]
    cluster_database = database or cluster_endpoint_split[5].split("/")[1]

    additional_args = {}
    if os.environ.get('ENDPOINT_URL'):
        import urllib3
        # disable insecure warnings when testing endpoint is used
        urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
        additional_args = {'endpoint_url': os.environ.get('ENDPOINT_URL'),
                           'verify': False}

    response = None
    rs_client = client("redshift", **additional_args)
    for attempt in range(1, max_attempts + 1):
        try:
            response = rs_client.get_cluster_credentials(
                DbUser=username, ClusterIdentifier=cluster_id, AutoCreate=False, DurationSeconds=credentials_timeout_sec
            )
        except NoCredentialsError:
            raise CredentialsException("No credentials found")
        except rs_client.exceptions.ClientError as e:
            if e.response['Error']['Code'] == 'ExpiredToken':
                logger.error(f"Error retrieving credentials for {cluster_id}: IAM credentials have expired.")
                exit(-1)
            else:
                logger.error(f"Got exception retrieving credentials ({e.response['Error']['Code']})")
                raise e
        except rs_client.exceptions.ClusterNotFoundFault:
            logger.error(f"Cluster {cluster_id} not found. Please confirm cluster endpoint, account, and region.")
            exit(-1)

        if response is None or response.get('DbPassword') is None:
            logger.warning(f"Failed to retrieve credentials for user {username} (attempt {attempt}/{max_attempts})")
            logger.debug(response)
            response = None
            if attempt < max_attempts:
                time.sleep(retry_delay_sec)
        else:
            break

    if response is None:
        msg = f"Failed to retrieve credentials for {username}"
        raise CredentialsException(msg)

    cluster_odbc_url = (
        "Driver={}; Server={}; Database={}; IAM=1; DbUser={}; DbPassword={}; Port={}".format(
            odbc_driver,
            cluster_host,
            cluster_database,
            response["DbUser"].split(":")[1],
            response["DbPassword"],
            cluster_port,
        )
    )

    cluster_psql = {
        "username": response["DbUser"],
        "password": response["DbPassword"],
        "host": cluster_host,
        "port": cluster_port,
        "database": cluster_database,
    }

    credentials = {# old params
                   'odbc': cluster_odbc_url,
                   'psql': cluster_psql,
                   # new params
                   'username': response['DbUser'],
                   'password': response['DbPassword'],
                   'host': cluster_host,
                   'port': cluster_port,
                   'database': cluster_database,
                   'odbc_driver': g_config["odbc_driver"]
                  }
    logger.debug("Successfully retrieved database credentials for {}".format(username))
    g_credentials_cache[username] = {'last_update': datetime.datetime.now(tz=datetime.timezone.utc),
                                     'target_cluster_urls': credentials}
    return credentials