tools/database/datasources.py (16 lines of code) (raw):
from .types import DataSourcesList, DataSourceItem
import os
from connectors import CosmosDBClient
async def get_all_datasources_info() -> DataSourcesList:
"""
Retrieve a list of all datasources.
Returns a DataSourcesList object with only 'datasource_name' and 'datasource_type'.
"""
# 1. Pull all documents from the `datasources` container.
cosmosdb = CosmosDBClient()
datasources_container = os.environ.get('DATASOURCES_CONTAINER', 'datasources')
documents = await cosmosdb.list_documents(datasources_container)
datasources_info = []
# 2. Process each document.
for doc in documents:
# Create a DataSourceItem with the required fields
datasource_item = DataSourceItem(
name=doc.get("id", ""),
description=doc.get("description", ""),
type=doc.get("type", "")
)
datasources_info.append(datasource_item)
# 3. Return as DataSourcesList
return DataSourcesList(datasources=datasources_info)