in connectors/sources/confluence.py [0:0]
def _get_session(self):
"""Generate and return base client session with configuration fields
Returns:
aiohttp.ClientSession: An instance of Client Session
"""
if self.session:
return self.session
self._logger.debug(f"Creating a '{self.data_source_type}' client session")
if self.data_source_type == CONFLUENCE_CLOUD:
auth = (
self.configuration["account_email"],
self.configuration["api_token"],
)
elif self.data_source_type == CONFLUENCE_SERVER:
auth = (
self.configuration["username"],
self.configuration["password"],
)
elif self.data_source_type == CONFLUENCE_DATA_CENTER:
auth = (
self.configuration["data_center_username"],
self.configuration["data_center_password"],
)
else:
msg = f"Unknown data source type '{self.data_source_type}' for Confluence connector"
self._logger.error(msg)
raise InvalidConfluenceDataSourceTypeError(msg)
basic_auth = aiohttp.BasicAuth(login=auth[0], password=auth[1])
timeout = aiohttp.ClientTimeout(total=None) # pyright: ignore
self.session = aiohttp.ClientSession(
auth=basic_auth,
headers={
"accept": "application/json",
"content-type": "application/json",
},
timeout=timeout,
raise_for_status=True,
)
return self.session