in pipeline/pipelineUtils/db.py [0:0]
def get_prompt_by_id(prompt_id: str):
"""
Retrieve a prompt document by its ID from the prompts container.
"""
client = CosmosClient(COSMOS_DB_URI, credential=config.credential)
database = client.get_database_client(COSMOS_DB_DATABASE)
prompts_container = database.get_container_client(COSMOS_DB_PROMPTS_CONTAINER)
try:
item = prompts_container.read_item(
item=prompt_id,
partition_key=prompt_id
)
return item
except exceptions.CosmosHttpResponseError as e:
logging.error(f"Error retrieving prompt {prompt_id}: {str(e)}")
return None