in src/dfcx_scrapi/tools/agent_task_generator.py [0:0]
def load_agent_tasks(self, force_reload: bool = False):
"""Check for cached Agent Task map or load new."""
if self.gcs:
data = self.load_or_create_task_file_gcs()
else:
data = self.load_or_create_task_file_local()
tasks = data.get("tasks", None)
# If the task metadata exists, but user wants to force reload anyways
if tasks and force_reload:
logging.info("Forcing reload of Agent Tasks...")
tasks = self.get_agent_tasks()
self.write_agent_tasks_to_local(tasks)
if self.gcs:
self.write_agent_tasks_to_gcs(tasks)
# Check if tasks exists and if not, create
if not tasks:
logging.info("Agent Task list does not exist. Creating...")
tasks = self.get_agent_tasks()
if self.gcs:
self.write_agent_tasks_to_gcs(tasks)
else:
self.write_agent_tasks_to_local(tasks)
if isinstance(tasks, list):
tasks = {"tasks": tasks}
return tasks