supporting-blog-content/onelake-connector-part-ii/connectors/sources/onelake.py (173 lines of code) (raw):
"""OneLake connector to retrieve data from datalakes"""
from functools import partial
from azure.identity import ClientSecretCredential
from azure.storage.filedatalake import DataLakeServiceClient
from connectors.source import BaseDataSource
ACCOUNT_NAME = "onelake"
class OneLakeDataSource(BaseDataSource):
"""OneLake"""
name = "OneLake"
service_type = "onelake"
incremental_sync_enabled = True
def __init__(self, configuration):
"""Set up the connection to the azure base client
Args:
configuration (DataSourceConfiguration): Object of DataSourceConfiguration class.
"""
super().__init__(configuration=configuration)
self.tentant_id = self.configuration["tentant_id"]
self.client_id = self.configuration["client_id"]
self.client_secret = self.configuration["client_secret"]
self.workspace_name = self.configuration["workspace_name"]
self.data_path = self.configuration["data_path"]
@classmethod
def get_default_configuration(cls):
"""Get the default configuration for OneLake
Returns:
dictionary: Default configuration
"""
return {
"tentant_id": {
"label": "OneLake tenant id",
"order": 1,
"type": "str",
},
"client_id": {
"label": "OneLake client id",
"order": 2,
"type": "str",
},
"client_secret": {
"label": "OneLake client secret",
"order": 3,
"type": "str",
"sensitive": True, # Hide sensitive data like passwords or secrets
},
"workspace_name": {
"label": "OneLake workspace name",
"order": 4,
"type": "str",
},
"data_path": {
"label": "OneLake data path",
"tooltip": "Path in format <DataLake>.Lakehouse/files/<Folder path>",
"order": 5,
"type": "str",
},
"account_name": {
"tooltip": "In the most cases is 'onelake'",
"default_value": ACCOUNT_NAME,
"label": "Account name",
"order": 6,
"type": "str",
},
}
async def ping(self):
"""Verify the connection with OneLake"""
self._logger.info("Generating file system client...")
try:
await self._get_directory_paths(self.configuration["data_path"])
self._logger.info("Connection to OneLake successful")
except Exception:
self._logger.exception("Error while connecting to OneLake.")
raise
def _get_account_url(self):
"""Get the account URL for OneLake
Returns:
str: Account URL
"""
return f"https://{self.configuration['account_name']}.dfs.fabric.microsoft.com"
def _get_token_credentials(self):
"""Get the token credentials for OneLake
Returns:
obj: Token credentials
"""
tentant_id = self.configuration["tentant_id"]
client_id = self.configuration["client_id"]
client_secret = self.configuration["client_secret"]
try:
return ClientSecretCredential(tentant_id, client_id, client_secret)
except Exception as e:
self._logger.error(f"Error while getting token credentials: {e}")
raise
def _get_service_client(self):
"""Get the service client for OneLake
Returns:
obj: Service client
"""
try:
return DataLakeServiceClient(
account_url=self._get_account_url(),
credential=self._get_token_credentials(),
)
except Exception as e:
self._logger.error(f"Error while getting service client: {e}")
raise
def _get_file_system_client(self):
"""Get the file system client for OneLake
Returns:
obj: File system client
"""
try:
return self._get_service_client().get_file_system_client(
self.configuration["workspace_name"]
)
except Exception as e:
self._logger.error(f"Error while getting file system client: {e}")
raise
def _get_directory_client(self):
"""Get the directory client for OneLake
Returns:
obj: Directory client
"""
try:
return self._get_file_system_client().get_directory_client(
self.configuration["data_path"]
)
except Exception as e:
self._logger.error(f"Error while getting directory client: {e}")
raise
async def _get_file_client(self, file_name):
"""Get file client from OneLake
Args:
file_name (str): name of the file
Returns:
obj: File client
"""
try:
return self._get_directory_client().get_file_client(file_name)
except Exception as e:
self._logger.error(f"Error while getting file client: {e}")
raise
async def _get_directory_paths(self, directory_path):
"""List directory paths from data lake
Args:
directory_path (str): Directory path
Returns:
list: List of paths
"""
try:
paths = self._get_file_system_client().get_paths(path=directory_path)
return paths
except Exception as e:
self._logger.error(f"Error while getting directory paths: {e}")
raise
async def format_file(self, file_client):
"""Format file_client to be processed
Args:
file_client (obj): File object
Returns:
dict: Formatted file
"""
try:
file_properties = file_client.get_file_properties()
return {
"_id": f"{file_client.file_system_name}_{file_properties.name.split('/')[-1]}",
"name": file_properties.name.split("/")[-1],
"created_at": file_properties.creation_time.isoformat(),
"_timestamp": file_properties.last_modified.isoformat(),
"size": file_properties.size,
}
except Exception as e:
self._logger.error(
f"Error while formatting file or getting file properties: {e}"
)
raise
async def download_file(self, file_client):
"""Download file from OneLake
Args:
file_client (obj): File client
Returns:
generator: File stream
"""
try:
download = file_client.download_file()
stream = download.chunks()
for chunk in stream:
yield chunk
except Exception as e:
self._logger.error(f"Error while downloading file: {e}")
raise
async def get_content(self, file_name, doit=None, timestamp=None):
"""Obtains the file content for the specified file in `file_name`.
Args:
file_name (obj): The file name to process to obtain the content.
timestamp (timestamp, optional): Timestamp of blob last modified. Defaults to None.
doit (boolean, optional): Boolean value for whether to get content or not. Defaults to None.
Returns:
str: Content of the file or None if not applicable.
"""
if not doit:
return
file_client = await self._get_file_client(file_name)
file_properties = file_client.get_file_properties()
file_extension = self.get_file_extension(file_name)
doc = {
"_id": f"{file_client.file_system_name}_{file_properties.name}", # workspacename_data_path
"name": file_properties.name.split("/")[-1],
"_timestamp": file_properties.last_modified,
"created_at": file_properties.creation_time,
}
can_be_downloaded = self.can_file_be_downloaded(
file_extension=file_extension,
filename=file_properties.name,
file_size=file_properties.size,
)
if not can_be_downloaded:
return doc
extracted_doc = await self.download_and_extract_file(
doc=doc,
source_filename=file_properties.name.split("/")[-1],
file_extension=file_extension,
download_func=partial(self.download_file, file_client),
)
return extracted_doc if extracted_doc is not None else doc
async def prepare_files(self, doc_paths):
"""Prepare files for processing
Args:
doc_paths (list): List of paths extracted from OneLake
Yields:
tuple: File document and partial function to get content
"""
for path in doc_paths:
file_name = path.name.split("/")[-1]
field_client = await self._get_file_client(file_name)
yield self.format_file(field_client)
async def get_docs(self, filtering=None):
"""Get documents from OneLake and index them
Yields:
tuple: dictionary with meta-data of each file and a partial function to get the file content.
"""
directory_paths = await self._get_directory_paths(
self.configuration["data_path"]
)
async for file in self.prepare_files(directory_paths):
file_dict = await file
yield file_dict, partial(self.get_content, file_dict["name"])