src/dfcx_scrapi/tools/agent_task_generator.py (275 lines of code) (raw):
"""Methods to Generator Agent Tasks for arbitrary Agent inputs."""
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import os
import random
from pathlib import Path
from typing import Any, Dict, List
from google.cloud.dialogflowcx_v3beta1 import types
from google.oauth2 import service_account
from vertexai.generative_models import GenerationConfig, GenerativeModel
from dfcx_scrapi.core.agents import Agents
from dfcx_scrapi.core.flows import Flows
from dfcx_scrapi.core.intents import Intents
from dfcx_scrapi.core.playbooks import Playbooks
from dfcx_scrapi.core.scrapi_base import ScrapiBase
from dfcx_scrapi.core.tools import Tools
from dfcx_scrapi.tools.gcs_utils import GcsUtils
class AgentTaskGenerator(ScrapiBase):
def __init__(
self,
agent_id: str,
creds_path: str = None,
creds_dict: Dict = None,
creds: service_account.Credentials = None,
scope=False,
gcs: GcsUtils = None,
debug: bool = False
):
super().__init__(
creds_path=creds_path,
creds_dict=creds_dict,
creds=creds,
scope=scope,
)
self.agent_id = agent_id
self.debug = debug
if self.debug:
self.set_logging_level("debug")
self.agents = Agents(agent_id=self.agent_id, creds=self.creds)
self.intents = Intents(agent_id=self.agent_id, creds=self.creds)
self.flows = Flows(agent_id=self.agent_id, creds=self.creds)
self.playbooks = Playbooks(agent_id=self.agent_id, creds=self.creds)
self.tools = Tools(agent_id=self.agent_id, creds=self.creds)
self.model = GenerativeModel(
"gemini-1.5-flash-001",
system_instruction=Prompts.task_system
)
self.gcs = gcs
self.task_file = "agent_tasks.json"
if self.gcs:
self.short_id = self.agent_id.split("/")[-1]
self.filename = f"testwiz/{self.short_id}/data/{self.task_file}"
self.full_path = self.gcs.get_fully_qualified_path(self.filename)
@staticmethod
def write_agent_tasks_to_local(tasks: Dict[str, Any]):
"""Write Agent Task list to Local file."""
task_file = "agent_tasks.json"
data_dir = os.path.join(Path(__file__).resolve().parents[2], "data")
cached_path = f"{data_dir}/{task_file}"
# Write data back to file
with open(cached_path, "w", encoding="UTF-8") as outfile:
data_str = json.dumps(tasks)
outfile.write(data_str)
outfile.close()
@staticmethod
def get_sample_utterances(intent: Any):
"""Get 5 or less sample utterances from the intent."""
num_to_select = min(len(intent.training_phrases), 5)
intents = random.sample(intent.training_phrases, num_to_select)
utterances = []
for tp in intents:
utterances.append(tp.parts[0].text)
return utterances
def gather_tool_spec_details(self, tool: types.Tool):
"""Determine Tool type and extract actions."""
# Datastore Type
connections = tool.data_store_spec.data_store_connections
if len(connections) > 0:
return tool.display_name, connections
# Extension Type
elif tool.extension_spec.name != "":
return None, None
# Function Call Type
elif tool.function_spec.input_schema:
input_schema = self.recurse_proto_marshal_to_dict(
tool.function_spec.input_schema
)
return None, input_schema
# OpenAPI Spec Type
elif tool.open_api_spec.text_schema != "":
return None, tool.open_api_spec.text_schema
def _call_generate_content(self, prompt: str):
return self.model.generate_content(
prompt,
safety_settings=self.build_safety_settings(),
generation_config = GenerationConfig(
temperature=0.1,
top_p=0.95,
candidate_count=1,
max_output_tokens=8192,
response_mime_type="application/json",
response_schema=Schemas.task_schema,
)
)
def load_or_create_task_file_gcs(self):
"""Load or create Agent Task file from GCS."""
data = {"tasks": None}
data_from_file = self.gcs.load_file_if_exists(
bucket_name=self.gcs.bucket_name,
filename=self.full_path
)
if data_from_file:
logging.info("Loading Task List from GCS cache...")
data = json.loads(data_from_file)
else:
self.gcs.write_dict_to_gcs(
bucket_name=self.gcs.bucket_name,
data=data,
filename=self.full_path
)
return data
def load_or_create_task_file_local(self):
"""Load or create Agent task file from Local."""
data_dir = os.path.join(Path(__file__).resolve().parents[2], "data")
cached_path = f"{data_dir}/{self.task_file}"
data = {"tasks": None}
# Create local if file doesn't exist
if self.task_file not in os.listdir(data_dir):
logging.info("Creating Agent Task File...")
with open(cached_path, "w", encoding="UTF-8") as newfile:
data_str = json.dumps(data)
newfile.write(data_str)
newfile.close()
else:
with open(cached_path, "r", encoding="UTF-8") as infile:
data = json.load(infile)
return data
def write_agent_tasks_to_gcs(self, tasks: Dict[str, Any]):
"""Write Agent Task list to GCS file."""
# write back to file
self.gcs.write_dict_to_gcs(
bucket_name=self.gcs.bucket_name,
data=tasks,
filename=self.full_path
)
def load_agent_tasks(self, force_reload: bool = False):
"""Check for cached Agent Task map or load new."""
if self.gcs:
data = self.load_or_create_task_file_gcs()
else:
data = self.load_or_create_task_file_local()
tasks = data.get("tasks", None)
# If the task metadata exists, but user wants to force reload anyways
if tasks and force_reload:
logging.info("Forcing reload of Agent Tasks...")
tasks = self.get_agent_tasks()
self.write_agent_tasks_to_local(tasks)
if self.gcs:
self.write_agent_tasks_to_gcs(tasks)
# Check if tasks exists and if not, create
if not tasks:
logging.info("Agent Task list does not exist. Creating...")
tasks = self.get_agent_tasks()
if self.gcs:
self.write_agent_tasks_to_gcs(tasks)
else:
self.write_agent_tasks_to_local(tasks)
if isinstance(tasks, list):
tasks = {"tasks": tasks}
return tasks
def trim_agent_tools(
self, playbooks: List[Any], tools: List[Any]
) -> List[Any]:
"""Trim down the list of agent tools to only those that are ref'd."""
playbook_tools = set()
for playbook in playbooks:
for tool in playbook.referenced_tools:
playbook_tools.add(tool)
tool_info = {}
i = 0
for tool in tools:
if tool.name in playbook_tools:
actions, schema = self.gather_tool_spec_details(tool)
tool_info[f"tool_{i}"] = {
"name": tool.display_name,
"description": tool.description,
"actions": actions,
"schemas": schema
}
i += 1
return tool_info
def get_intent_info(self):
"""Get structured intent info from Agent to use in prompt."""
EXCLUDE_INTENTS = [
"Default Welcome Intent",
"Default Negative Intent"
]
all_intents = self.intents.list_intents(self.agent_id)
intent_info = []
for intent in all_intents:
if intent.display_name in EXCLUDE_INTENTS:
continue
intent_info.append(
{
"display_name": intent.display_name,
"description": intent.description,
"sample_utterances": self.get_sample_utterances(intent)
}
)
return intent_info
def get_playbook_steps(self, instruction: types.Playbook.Instruction):
"""Extract the playbook plain text steps."""
all_steps = []
for text in instruction.steps:
all_steps.append(text.text)
return all_steps
def trim_agent_data(self, agent: Any):
"""Select only the relevant info from the agent proto."""
return {
"name": agent.display_name,
"supported_languages": agent.supported_language_codes,
"time_zone": agent.time_zone,
}
def trim_playbook_data(self, playbooks: Any):
"""Select only the relevant info from the playbook protos."""
playbook_info = {}
i = 0
for playbook in playbooks:
playbook_info[f"agent_{i}"] = {
"name": playbook.display_name,
"goal": playbook.goal,
"instructions": self.get_playbook_steps(playbook.instruction)
}
i += 1
return playbook_info
def get_agent_tasks(self) -> Dict[str, Any]:
"""Extract App/Agent details and determine Agent's task list."""
task_msg = "*** AUTO-GENERATING AGENT TASK LIST ***"
logging.info(task_msg)
agent = self.agents.get_agent(self.agent_id)
agent = self.trim_agent_data(agent)
playbooks = self.playbooks.list_playbooks(self.agent_id)
tools = self.tools.list_tools(self.agent_id)
tools = self.trim_agent_tools(playbooks, tools)
# Order matters so that Tool trim can happen before playbook trim
playbooks = self.trim_playbook_data(playbooks)
intent_info = self.get_intent_info()
flow_page_info = self.flows.get_flow_page_map(self.agent_id)
prompt = Prompts.task_main.replace("{AGENT}", str(agent))
prompt = prompt.replace("{PLAYBOOKS}", str(playbooks))
prompt = prompt.replace("{TOOLS}", str(tools))
prompt = prompt.replace("{INTENTS}", str(intent_info))
prompt = prompt.replace("{FLOWS_AND_PAGES}", str(flow_page_info))
if self.debug:
logging.debug(prompt)
res = self._call_generate_content(prompt)
tasks_dict = json.loads(res.text)
return tasks_dict
def get_agent_tasks_from_user_input(
self, tasks: Any) -> Dict[str, Any]:
"""Given an arbitrary user input, clean and format with LLM."""
task_msg = "*** FORMATTING AGENT TASK LIST ***"
logging.info(task_msg)
prompt = Prompts.user_task_main.replace("{USER_DETAILS}", str(tasks))
if self.debug:
logging.debug(prompt)
res = self._call_generate_content(prompt)
tasks_dict = json.loads(res.text)
return tasks_dict
class Prompts:
# Main prompt template for task generation
task_main: str = """**APP DETAILS**
{AGENT}
**AGENT DETAILS**
{PLAYBOOKS}
{FLOWS_AND_PAGES}
{INTENTS}
**TOOL DETAILS**
{TOOLS}
"""
# System prompt for task generation
task_system: str = """You are a senior virtual agent evaluator.
Your job is to determine the core tasks that a virtual agent can handle based on its provided details, playbook, and available tools.
Focus on the main functionalities that would be useful to an end user rather than individual steps or specific details within the process. Do not include generic functionalities such as "Intent Detection" or "Sentiment Analysis" unless those features provide a tangible outcome for the end user. Explain each functionality clearly and concisely.
Return a concise list of the primary capabilities.
The information will be provided in a format like this:
**AGENT DETAILS**
<AGENT INFORMATION>
TOOL DETAILS:
<TOOL INFORMATION>
The resulting information should be returned in JSON format which will follow this schema:
```json
{
"type": "object",
"properties": {
"tasks": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string"
},
"description": {
"type": "string"
}
},
"required": [
"name",
"description"
]
}
}
},
"required": [
"tasks"
]
}
```
""" # noqa: E501
user_task_main: str = """APP DETAILS PROVIDED BY USER:
{USER_DETAILS}
"""
class Schemas:
task_schema = {
"type": "object",
"properties": {
"tasks": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string"
},
"description": {
"type": "string"
}
},
"required": [
"name",
"description"
]
}
}
},
"required": [
"tasks"
]
}