def set_iam_credentials()

in redshift_connector/iam_helper.py [0:0]


    def set_iam_credentials(info: RedshiftProperty) -> None:
        """
        Helper function to create the appropriate credential providers.
        """
        klass: typing.Optional[SamlCredentialsProvider] = None
        provider: typing.Union[SamlCredentialsProvider, AWSCredentialsProvider]

        if info.credentials_provider is not None:
            try:
                klass = dynamic_plugin_import(info.credentials_provider)
                provider = klass()  # type: ignore
                provider.add_parameter(info)  # type: ignore
            except (AttributeError, ModuleNotFoundError):
                _logger.debug("Failed to load user defined plugin: {}".format(info.credentials_provider))
                try:
                    predefined_idp: str = "redshift_connector.plugin.{}".format(info.credentials_provider)
                    klass = dynamic_plugin_import(predefined_idp)
                    provider = klass()  # type: ignore
                    provider.add_parameter(info)  # type: ignore
                    info.put("credentials_provider", predefined_idp)
                except (AttributeError, ModuleNotFoundError):
                    _logger.debug(
                        "Failed to load pre-defined IdP plugin from redshift_connector.plugin: {}".format(
                            info.credentials_provider
                        )
                    )
                    raise InterfaceError("Invalid credentials provider " + info.credentials_provider)
        else:  # indicates AWS Credentials will be used
            _logger.debug("AWS Credentials provider will be used for authentication")
            provider = AWSCredentialsProvider()
            provider.add_parameter(info)

        if isinstance(provider, SamlCredentialsProvider):
            credentials: CredentialsHolder = provider.get_credentials()
            metadata: CredentialsHolder.IamMetadata = credentials.get_metadata()
            if metadata is not None:
                auto_create: bool = metadata.get_auto_create()
                db_user: typing.Optional[str] = metadata.get_db_user()
                saml_db_user: typing.Optional[str] = metadata.get_saml_db_user()
                profile_db_user: typing.Optional[str] = metadata.get_profile_db_user()
                db_groups: typing.List[str] = metadata.get_db_groups()
                force_lowercase: bool = metadata.get_force_lowercase()
                allow_db_user_override: bool = metadata.get_allow_db_user_override()
                if auto_create is True:
                    info.put("auto_create", auto_create)

                if force_lowercase is True:
                    info.put("force_lowercase", force_lowercase)

                if allow_db_user_override is True:
                    if saml_db_user is not None:
                        info.put("db_user", saml_db_user)
                    elif db_user is not None:
                        info.put("db_user", db_user)
                    elif profile_db_user is not None:
                        info.put("db_user", profile_db_user)
                else:
                    if db_user is not None:
                        info.put("db_user", db_user)
                    elif profile_db_user is not None:
                        info.put("db_user", profile_db_user)
                    elif saml_db_user is not None:
                        info.put("db_user", saml_db_user)

                if (len(info.db_groups) == 0) and (len(db_groups) > 0):
                    if force_lowercase:
                        info.db_groups = [group.lower() for group in db_groups]
                    else:
                        info.db_groups = db_groups

        IamHelper.set_cluster_credentials(provider, info)