def _get_session()

in connectors/sources/confluence.py [0:0]


    def _get_session(self):
        """Generate and return base client session with configuration fields

        Returns:
            aiohttp.ClientSession: An instance of Client Session
        """
        if self.session:
            return self.session

        self._logger.debug(f"Creating a '{self.data_source_type}' client session")
        if self.data_source_type == CONFLUENCE_CLOUD:
            auth = (
                self.configuration["account_email"],
                self.configuration["api_token"],
            )
        elif self.data_source_type == CONFLUENCE_SERVER:
            auth = (
                self.configuration["username"],
                self.configuration["password"],
            )
        elif self.data_source_type == CONFLUENCE_DATA_CENTER:
            auth = (
                self.configuration["data_center_username"],
                self.configuration["data_center_password"],
            )
        else:
            msg = f"Unknown data source type '{self.data_source_type}' for Confluence connector"
            self._logger.error(msg)

            raise InvalidConfluenceDataSourceTypeError(msg)

        basic_auth = aiohttp.BasicAuth(login=auth[0], password=auth[1])
        timeout = aiohttp.ClientTimeout(total=None)  # pyright: ignore
        self.session = aiohttp.ClientSession(
            auth=basic_auth,
            headers={
                "accept": "application/json",
                "content-type": "application/json",
            },
            timeout=timeout,
            raise_for_status=True,
        )
        return self.session