orc/plugins/Security/native_function.py (108 lines of code) (raw):

from shared.util import get_secret # from semantic_kernel.skill_definition import sk_function from semantic_kernel.functions import kernel_function import logging import os import aiohttp import sys from typing import Dict if sys.version_info >= (3, 9): from typing import Annotated else: from typing_extensions import Annotated SECURITY_HUB_ENDPOINT = os.environ["SECURITY_HUB_ENDPOINT"] APIM_ENABLED=os.environ.get("APIM_ENABLED", "false") APIM_ENABLED=True if APIM_ENABLED.lower() == "true" else False class Security: @kernel_function( description="Check security of question.", name="QuestionSecurityCheck", ) async def QuestionSecurityCheck( self, question: Annotated[str, "The user question"], security_hub_key: Annotated[str, "The key to access the security hub"] ) -> Annotated[bool, "Passed security checks"]: if APIM_ENABLED: security_hub_endpoint=os.environ["APIM_SECURITY_HUB_ENDPOINT"] else: security_hub_endpoint=SECURITY_HUB_ENDPOINT try: async with aiohttp.ClientSession() as session: # Make a POST request using the session.post() method async with session.post( security_hub_endpoint+"/QuestionChecks", json={"question": question}, headers={"x-functions-key": security_hub_key} ) as request: if request.status != 200: logging.error(f"Error requesting security hub: {request.status} {request.reason}") raise(Exception(f"Error requesting security hub: {request.status} {request.reason}")) else: result = await request.json() return result except Exception as e: logging.error(f"Error requesting security hub: {str(e)}") raise(Exception(f"Error requesting security hub: {str(e)}")) @kernel_function( description="Check security of generated answer.", name="AnswerSecurityCheck", ) async def AnswerSecurityCheck( self, question: Annotated[str, "The user question"], answer: Annotated[str, "The answer generated by the model"], sources: Annotated[str, "The sources to search for the answer"], security_hub_key: Annotated[str, "The key to access the security hub"] ) -> Annotated[bool, "Passed security checks"]: if APIM_ENABLED: security_hub_endpoint=os.environ["APIM_SECURITY_HUB_ENDPOINT"] else: security_hub_endpoint=SECURITY_HUB_ENDPOINT try: async with aiohttp.ClientSession() as session: # Make a POST request using the session.post() method async with session.post( security_hub_endpoint+"/AnswerChecks", json={"question": question, "answer": answer, "sources": sources}, headers={"x-functions-key": security_hub_key} ) as request: if request.status != 200: logging.error(f"Error requesting security hub: {request.status} {request.reason} {await request.text()}") raise(Exception(f"Error requesting security hub: {request.status} {request.reason} {await request.text()}")) else: result = await request.json() return result except Exception as e: logging.error(f"Error requesting security hub: {str(e)}") raise(Exception(f"Error requesting security hub: {str(e)}")) @kernel_function( description="Audit the question and answer.", name="Auditing", ) async def Auditing( self, question: str, answer: str, sources: str, security_checks: str, conversation_id: str, security_hub_key: str ) -> Dict: if APIM_ENABLED: security_hub_endpoint=os.environ["APIM_SECURITY_HUB_ENDPOINT"] else: security_hub_endpoint=SECURITY_HUB_ENDPOINT try: async with aiohttp.ClientSession() as session: # Make a POST request using the session.post() method async with session.post( security_hub_endpoint+"/audit", json={"question": question, "answer": answer, "sources": sources, "security_checks": security_checks, "conversation_id": conversation_id}, headers={"x-functions-key": security_hub_key} ) as request: if request.status != 200: logging.error(f"Error requesting security hub: {request.status} {request.reason}") raise(Exception(f"Error requesting security hub: {request.status} {request.reason}")) else: logging.info(f"Successfully audited question and answer") return except Exception as e: logging.error(f"Error requesting security hub: {str(e)}") raise(Exception(f"Error requesting security hub: {str(e)}"))