patterns/agents/orchestrator_workers.ipynb (284 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Orchestrator-Workers Workflow\n",
"In this workflow, a central LLM dynamically breaks down tasks, delegates them to worker LLMs, and synthesizes their results.\n",
"\n",
"### When to use this workflow\n",
"This workflow is well-suited for complex tasks where you can't predict the subtasks needed. The key difference from simple parallelization is its flexibility—subtasks aren't pre-defined, but determined by the orchestrator based on the specific input."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from typing import Dict, List, Optional\n",
"from util import llm_call, extract_xml\n",
"\n",
"def parse_tasks(tasks_xml: str) -> List[Dict]:\n",
" \"\"\"Parse XML tasks into a list of task dictionaries.\"\"\"\n",
" tasks = []\n",
" current_task = {}\n",
" \n",
" for line in tasks_xml.split('\\n'):\n",
" line = line.strip()\n",
" if not line:\n",
" continue\n",
" \n",
" if line.startswith(\"<task>\"):\n",
" current_task = {}\n",
" elif line.startswith(\"<type>\"):\n",
" current_task[\"type\"] = line[6:-7].strip()\n",
" elif line.startswith(\"<description>\"):\n",
" current_task[\"description\"] = line[12:-13].strip()\n",
" elif line.startswith(\"</task>\"):\n",
" if \"description\" in current_task:\n",
" if \"type\" not in current_task:\n",
" current_task[\"type\"] = \"default\"\n",
" tasks.append(current_task)\n",
" \n",
" return tasks\n",
"\n",
"class FlexibleOrchestrator:\n",
" \"\"\"Break down tasks and run them in parallel using worker LLMs.\"\"\"\n",
" \n",
" def __init__(\n",
" self,\n",
" orchestrator_prompt: str,\n",
" worker_prompt: str,\n",
" ):\n",
" \"\"\"Initialize with prompt templates.\"\"\"\n",
" self.orchestrator_prompt = orchestrator_prompt\n",
" self.worker_prompt = worker_prompt\n",
"\n",
" def _format_prompt(self, template: str, **kwargs) -> str:\n",
" \"\"\"Format a prompt template with variables.\"\"\"\n",
" try:\n",
" return template.format(**kwargs)\n",
" except KeyError as e:\n",
" raise ValueError(f\"Missing required prompt variable: {e}\")\n",
"\n",
" def process(self, task: str, context: Optional[Dict] = None) -> Dict:\n",
" \"\"\"Process task by breaking it down and running subtasks in parallel.\"\"\"\n",
" context = context or {}\n",
" \n",
" # Step 1: Get orchestrator response\n",
" orchestrator_input = self._format_prompt(\n",
" self.orchestrator_prompt,\n",
" task=task,\n",
" **context\n",
" )\n",
" orchestrator_response = llm_call(orchestrator_input)\n",
" \n",
" # Parse orchestrator response\n",
" analysis = extract_xml(orchestrator_response, \"analysis\")\n",
" tasks_xml = extract_xml(orchestrator_response, \"tasks\")\n",
" tasks = parse_tasks(tasks_xml)\n",
" \n",
" print(\"\\n=== ORCHESTRATOR OUTPUT ===\")\n",
" print(f\"\\nANALYSIS:\\n{analysis}\")\n",
" print(f\"\\nTASKS:\\n{tasks}\")\n",
" \n",
" # Step 2: Process each task\n",
" worker_results = []\n",
" for task_info in tasks:\n",
" worker_input = self._format_prompt(\n",
" self.worker_prompt,\n",
" original_task=task,\n",
" task_type=task_info['type'],\n",
" task_description=task_info['description'],\n",
" **context\n",
" )\n",
" \n",
" worker_response = llm_call(worker_input)\n",
" result = extract_xml(worker_response, \"response\")\n",
" \n",
" worker_results.append({\n",
" \"type\": task_info[\"type\"],\n",
" \"description\": task_info[\"description\"],\n",
" \"result\": result\n",
" })\n",
" \n",
" print(f\"\\n=== WORKER RESULT ({task_info['type']}) ===\\n{result}\\n\")\n",
" \n",
" return {\n",
" \"analysis\": analysis,\n",
" \"worker_results\": worker_results,\n",
" }\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Example Use Case: Marketing Variation Generation\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"=== ORCHESTRATOR OUTPUT ===\n",
"\n",
"ANALYSIS:\n",
"\n",
"This task requires creating marketing copy for an eco-friendly water bottle, which presents multiple angles for effective communication. The key challenge is balancing environmental benefits with practical features while maintaining appeal to different consumer segments.\n",
"\n",
"Key variations would be valuable because:\n",
"1. Technical buyers need specific details about materials and environmental impact\n",
"2. Lifestyle-focused consumers respond better to emotional benefits and storytelling\n",
"3. Different tones can target distinct market segments while promoting the same core product\n",
"\n",
"The technical approach serves those who make purchase decisions based on specifications and measurable impact, while the conversational approach connects with those who buy based on lifestyle alignment and emotional resonance.\n",
"\n",
"\n",
"TASKS:\n",
"[{'type': 'formal', 'description': '>Create a specification-focused description highlighting material composition, environmental certifications, capacity measurements, and quantifiable eco-impact (e.g., \"plastic bottles saved per year\"). Include technical details about manufacturing process and recycling capabilities.<'}, {'type': 'conversational', 'description': '>Develop a narrative-style description that focuses on the user experience, lifestyle benefits, and emotional connection to environmental protection. Use relatable scenarios and casual language to help readers envision the bottle as part of their daily routine.<'}, {'type': 'hybrid', 'description': '>Combine emotional appeal with key specifications by weaving technical details into a story-driven format. Balance environmental impact statistics with aspirational messaging about sustainability lifestyle choices.<'}]\n",
"\n",
"=== WORKER RESULT (formal) ===\n",
"\n",
"Introducing the EcoVessel Pro Series: A precision-engineered hydration solution crafted from 100% post-consumer recycled stainless steel, certified by the Global Recycled Standard (GRS).\n",
"\n",
"Technical Specifications:\n",
"• Capacity: 750ml / 25.4 fl oz\n",
"• Material: 18/8 food-grade recycled stainless steel (304 grade)\n",
"• Wall thickness: 2mm double-wall vacuum insulation\n",
"• Temperature retention: 24 hours cold / 12 hours hot\n",
"• Weight: 340g / 12 oz (empty)\n",
"\n",
"Environmental Certifications:\n",
"• Carbon Neutral Product certified by Climate Partner\n",
"• BPA-free verification from NSF International\n",
"• ISO 14001 Environmental Management certification\n",
"\n",
"Manufacturing Process:\n",
"Manufactured using hydroelectric power in our carbon-neutral facility, each vessel undergoes a proprietary eco-sanitization process utilizing steam-based sterilization, eliminating chemical cleaning agents. The powder coating is applied through a zero-waste electrostatic process.\n",
"\n",
"Environmental Impact Metrics:\n",
"• Eliminates approximately 167 single-use plastic bottles annually per user\n",
"• 87% lower carbon footprint compared to traditional bottle manufacturing\n",
"• 100% recyclable at end-of-life through our closed-loop recycling program\n",
"• Saves 2,920 liters of water annually through eliminated plastic bottle production\n",
"\n",
"Each unit includes a digital tracking code for real-time impact monitoring and verification of authenticity. Engineered for a minimum 10-year service life under normal usage conditions.\n",
"\n",
"\n",
"\n",
"=== WORKER RESULT (conversational) ===\n",
"\n",
"\n",
"\n",
"=== WORKER RESULT (hybrid) ===\n",
"\n",
"Meet the AquaVerde Elite - where your daily hydration ritual becomes a powerful statement for our planet's future.\n",
"\n",
"Imagine starting your day knowing that every sip you take helps prevent up to 167 single-use plastic bottles from entering our oceans annually. The AquaVerde Elite isn't just a water bottle; it's your personal ambassador in the fight against plastic pollution, crafted from aerospace-grade recycled stainless steel that's been given a second life.\n",
"\n",
"Built to accompany you through life's adventures, this 24oz companion features our innovative ThermaLock™ technology, maintaining your cold drinks frosty for 24 hours or your hot beverages steaming for 12 hours. The double-wall vacuum insulation isn't just about performance - it's engineered to use 30% less material than conventional designs while delivering superior temperature retention.\n",
"\n",
"The bottle's sleek silhouette houses thoughtful details that enhance your daily experience: a leak-proof AutoSeal cap that operates with one hand, a built-in carrying loop made from recycled climbing rope, and our signature CloudTouch™ exterior finish that's both grippy and gorgeous. Available in four nature-inspired colors (Ocean Deep, Forest Canopy, Desert Dawn, and Mountain Mist), each bottle's finish is created using a water-based, zero-VOC coating process.\n",
"\n",
"But perhaps the most beautiful feature is what you don't see - every AquaVerde Elite helps fund clean water projects in developing communities, with 2% of each purchase supporting water conservation initiatives worldwide. Your choice to carry the AquaVerde Elite isn't just about staying hydrated; it's about being part of a global movement toward a more sustainable future.\n",
"\n",
"Specifications that matter:\n",
"• Capacity: 24oz/710ml\n",
"• Weight: 12.8oz\n",
"• Materials: 90% recycled 18/8 stainless steel\n",
"• BPA-free, phthalate-free\n",
"• Dishwasher safe\n",
"• Lifetime warranty\n",
"\n",
"Join the growing community of AquaVerde carriers who've collectively prevented over 2 million single-use bottles from entering our ecosystems. Because every drop counts, and every choice matters.\n",
"\n",
"\n"
]
}
],
"source": [
"ORCHESTRATOR_PROMPT = \"\"\"\n",
"Analyze this task and break it down into 2-3 distinct approaches:\n",
"\n",
"Task: {task}\n",
"\n",
"Return your response in this format:\n",
"\n",
"<analysis>\n",
"Explain your understanding of the task and which variations would be valuable.\n",
"Focus on how each approach serves different aspects of the task.\n",
"</analysis>\n",
"\n",
"<tasks>\n",
" <task>\n",
" <type>formal</type>\n",
" <description>Write a precise, technical version that emphasizes specifications</description>\n",
" </task>\n",
" <task>\n",
" <type>conversational</type>\n",
" <description>Write an engaging, friendly version that connects with readers</description>\n",
" </task>\n",
"</tasks>\n",
"\"\"\"\n",
"\n",
"WORKER_PROMPT = \"\"\"\n",
"Generate content based on:\n",
"Task: {original_task}\n",
"Style: {task_type}\n",
"Guidelines: {task_description}\n",
"\n",
"Return your response in this format:\n",
"\n",
"<response>\n",
"Your content here, maintaining the specified style and fully addressing requirements.\n",
"</response>\n",
"\"\"\"\n",
"\n",
"\n",
"orchestrator = FlexibleOrchestrator(\n",
" orchestrator_prompt=ORCHESTRATOR_PROMPT,\n",
" worker_prompt=WORKER_PROMPT,\n",
")\n",
"\n",
"results = orchestrator.process(\n",
" task=\"Write a product description for a new eco-friendly water bottle\",\n",
" context={\n",
" \"target_audience\": \"environmentally conscious millennials\",\n",
" \"key_features\": [\"plastic-free\", \"insulated\", \"lifetime warranty\"]\n",
" }\n",
")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "py311",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.6"
}
},
"nbformat": 4,
"nbformat_minor": 2
}