in redshift_connector/iam_helper.py [0:0]
def set_iam_credentials(info: RedshiftProperty) -> None:
"""
Helper function to create the appropriate credential providers.
"""
klass: typing.Optional[SamlCredentialsProvider] = None
provider: typing.Union[SamlCredentialsProvider, AWSCredentialsProvider]
if info.credentials_provider is not None:
try:
klass = dynamic_plugin_import(info.credentials_provider)
provider = klass() # type: ignore
provider.add_parameter(info) # type: ignore
except (AttributeError, ModuleNotFoundError):
_logger.debug("Failed to load user defined plugin: {}".format(info.credentials_provider))
try:
predefined_idp: str = "redshift_connector.plugin.{}".format(info.credentials_provider)
klass = dynamic_plugin_import(predefined_idp)
provider = klass() # type: ignore
provider.add_parameter(info) # type: ignore
info.put("credentials_provider", predefined_idp)
except (AttributeError, ModuleNotFoundError):
_logger.debug(
"Failed to load pre-defined IdP plugin from redshift_connector.plugin: {}".format(
info.credentials_provider
)
)
raise InterfaceError("Invalid credentials provider " + info.credentials_provider)
else: # indicates AWS Credentials will be used
_logger.debug("AWS Credentials provider will be used for authentication")
provider = AWSCredentialsProvider()
provider.add_parameter(info)
if isinstance(provider, SamlCredentialsProvider):
credentials: CredentialsHolder = provider.get_credentials()
metadata: CredentialsHolder.IamMetadata = credentials.get_metadata()
if metadata is not None:
auto_create: bool = metadata.get_auto_create()
db_user: typing.Optional[str] = metadata.get_db_user()
saml_db_user: typing.Optional[str] = metadata.get_saml_db_user()
profile_db_user: typing.Optional[str] = metadata.get_profile_db_user()
db_groups: typing.List[str] = metadata.get_db_groups()
force_lowercase: bool = metadata.get_force_lowercase()
allow_db_user_override: bool = metadata.get_allow_db_user_override()
if auto_create is True:
info.put("auto_create", auto_create)
if force_lowercase is True:
info.put("force_lowercase", force_lowercase)
if allow_db_user_override is True:
if saml_db_user is not None:
info.put("db_user", saml_db_user)
elif db_user is not None:
info.put("db_user", db_user)
elif profile_db_user is not None:
info.put("db_user", profile_db_user)
else:
if db_user is not None:
info.put("db_user", db_user)
elif profile_db_user is not None:
info.put("db_user", profile_db_user)
elif saml_db_user is not None:
info.put("db_user", saml_db_user)
if (len(info.db_groups) == 0) and (len(db_groups) > 0):
if force_lowercase:
info.db_groups = [group.lower() for group in db_groups]
else:
info.db_groups = db_groups
IamHelper.set_cluster_credentials(provider, info)