in src/SimpleReplay/replay.py [0:0]
def get_connection_string(username, database=None, max_attempts=10, skip_cache=False):
credentials_timeout_sec = 3600
retry_delay_sec = 10
# how long to cache credentials per user
cache_timeout_sec = 1800
# check the cache
if not skip_cache and g_credentials_cache.get(username) is not None:
record = g_credentials_cache.get(username)
if (datetime.datetime.now(tz=datetime.timezone.utc) - record['last_update']).total_seconds() < cache_timeout_sec:
logger.debug(f'Using {username} credentials from cache')
return record['target_cluster_urls']
del g_credentials_cache[username]
cluster_endpoint = g_config["target_cluster_endpoint"]
odbc_driver = g_config["odbc_driver"]
cluster_endpoint_split = cluster_endpoint.split(".")
cluster_id = cluster_endpoint_split[0]
cluster_host = cluster_endpoint.split(":")[0]
cluster_port = cluster_endpoint_split[5].split("/")[0][4:]
cluster_database = database or cluster_endpoint_split[5].split("/")[1]
additional_args = {}
if os.environ.get('ENDPOINT_URL'):
import urllib3
# disable insecure warnings when testing endpoint is used
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
additional_args = {'endpoint_url': os.environ.get('ENDPOINT_URL'),
'verify': False}
response = None
rs_client = client("redshift", **additional_args)
for attempt in range(1, max_attempts + 1):
try:
response = rs_client.get_cluster_credentials(
DbUser=username, ClusterIdentifier=cluster_id, AutoCreate=False, DurationSeconds=credentials_timeout_sec
)
except NoCredentialsError:
raise CredentialsException("No credentials found")
except rs_client.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'ExpiredToken':
logger.error(f"Error retrieving credentials for {cluster_id}: IAM credentials have expired.")
exit(-1)
else:
logger.error(f"Got exception retrieving credentials ({e.response['Error']['Code']})")
raise e
except rs_client.exceptions.ClusterNotFoundFault:
logger.error(f"Cluster {cluster_id} not found. Please confirm cluster endpoint, account, and region.")
exit(-1)
if response is None or response.get('DbPassword') is None:
logger.warning(f"Failed to retrieve credentials for user {username} (attempt {attempt}/{max_attempts})")
logger.debug(response)
response = None
if attempt < max_attempts:
time.sleep(retry_delay_sec)
else:
break
if response is None:
msg = f"Failed to retrieve credentials for {username}"
raise CredentialsException(msg)
cluster_odbc_url = (
"Driver={}; Server={}; Database={}; IAM=1; DbUser={}; DbPassword={}; Port={}".format(
odbc_driver,
cluster_host,
cluster_database,
response["DbUser"].split(":")[1],
response["DbPassword"],
cluster_port,
)
)
cluster_psql = {
"username": response["DbUser"],
"password": response["DbPassword"],
"host": cluster_host,
"port": cluster_port,
"database": cluster_database,
}
credentials = {# old params
'odbc': cluster_odbc_url,
'psql': cluster_psql,
# new params
'username': response['DbUser'],
'password': response['DbPassword'],
'host': cluster_host,
'port': cluster_port,
'database': cluster_database,
'odbc_driver': g_config["odbc_driver"]
}
logger.debug("Successfully retrieved database credentials for {}".format(username))
g_credentials_cache[username] = {'last_update': datetime.datetime.now(tz=datetime.timezone.utc),
'target_cluster_urls': credentials}
return credentials